Mounting ADLS with Databricks

 Mounting of ADLS contianer Using SAS token

# Container name
# Storage account name
# Storage account SAS token
# Mount point name (it could be anything /mnt/.....)

#code Template
dbutils.fs.mount( source = 'wasbs://<conatiner-name>@<storage-account-name>.blob.core.windows
.net',
                 mount_point= '<mount-point-name>', extra_configs ={'fs.azure.sas.<conatiner-
name>.<storage-account-name>.blob.core.windows.net':'<SAS-Token>'})

# Example will be like this
dbutils.fs.mount( source = 'wasbs://landing@savimaladls.blob.core.windows.net',
                 mount_point= '/mnt/abc2', extra_configs ={'fs.azure.sas.landing.savimaladls.
blob.core.windows.net':'?sv=2021-06-08&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2022-09-26T12:11:
56Z&st=2022-09-26T04:11:56Z&spr=https&sig=XO45qIekFi%2FJV9z%2F5mec5DicdsGvgGCdbzxoykEDd5i0M%3
D'})

#in next cell
%fs ls /mnt/abc2


Mounting of ADLS contianer Using SAS token with secret scope and key vault

# Container name
# Stoage account name
# Mount point name (it could be anything /mnt/.....)
# Databricks secret scope name
# Azure key vault key name containing the secret info

#code Template
dbutils.fs.mount( source = 'wasbs://<conatiner-name>@<storage-account-name>.blob.core.windows
.net',
                 mount_point= '<mount-point-name>', extra_configs ={'fs.azure.sas.<conatiner-
name>.<storage-account-name>.blob.core.windows.net': dbutils.secrets.get(scope="<databricks-
secret-scope-name>",key="<key-vault-secret-name>")})

# Example will be like this
dbutils.fs.mount( source = 'wasbs://landing@savimaladls.blob.core.windows.net',
                 mount_point= '/mnt/abc2', extra_configs ={'fs.azure.sas.landing.savimaladls
.blob.core.windows.net':dbutils.secrets.get(scope="dbscopemissionbatch2",key="dbserviceprinci
palkey")})

Mounting of ADLS contianer Using Application Registration
# Container name
# Stoage account name
# Tennant Id
# Application Id
# Secret value
# Mount point name (it could be anything /mnt/.....)

#Code Template
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.
ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "<application-id>"",
       "fs.azure.account.oauth2.client.secret": "<service-credential-key>",
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/
<tennant-id>/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.
net",mount_point = "<mount-point-name>",extra_configs = configs)

#Example code
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.
ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "d9c97ba4-8943-4b06-b9d5-eb239baf9113",
       "fs.azure.account.oauth2.client.secret": "uwsajdbwj3u392smsknas",
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/
3cc553a5-eec4-4b67-9863-7e488c9fa455/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(source = "abfss://landing@savimaladls.dfs.core.windows.net",mount_point =
"/mnt/mission100landing",extra_configs = configs)

Mounting of ADLS contianer Using Application Registration with secret scope and key vault

# Container name
# Stoage account name
# Tennant Id
# Application Id
# Mount point name (it could be anything /mnt/.....)
# Databricks secret scope name
# Azure key vault key name containing the secret info

#Code Template

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.
ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "<application-id>"",
       "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="<databricks-secret-
scope-name>",key="<key-vault-secret-name>"),
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/
<tennant-id>/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows
.net",mount_point = "<mount-point-name>",extra_configs = configs)


#Example code
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.
ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "d9c97ba4-8943-4b06-b9d5-eb239baf9113",
       "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="
Batch5_Databricks_Secret_Scope",key="APPSECRET"),
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/
3cc553a5-eec4-4b67-9863-7e488c9fa455/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(source = "abfss://landing@savimaladls.dfs.core.windows.net",mount_point = "
/mnt/mission100landing",extra_configs = configs)


Check all the mount points
print(dbutils.fs.mounts())

How to unmount the mount point
dbutils.fs.unmount("<mount-point-name-to-be-unmounted>")

How to Refresh mount points
dbutils.fs.refreshMounts()

Check all mount releated function use this help function
dbutils.fs.help()

Azure + Azure DataFactory + Synapse + Databrick+ Python Points

 

1)      Azure Databricks is a powerful platform that is built on top of Apache Spark and is designed specifically for huge data analytics. Setting it up and deploying it to Azure take just a few minutes, and once it's there, using it is quite easy. Because of its seamless connectivity with other Azure services, Databricks is an excellent choice for data engineers who want to deal with big amounts of data in the cloud. This makes Databricks an excellent solution.

2)      DBU stands for Databricks Unified, which is a Databricks framework for handling resources and calculating prices.

3)      Azure Databricks comes with many benefits including reduced costs, increased productivity, and increased security.

4)      Azure Databricks has four types of clusters, including Interactive, Job, Low-priority, and High-priority.

5)      Caching: The cache refers to the practice of storing information temporarily. When you go to a website that you visit frequently, your browser takes the information from the cache instead of the server. This helps save time and reduce the server’s load.

6)      Autoscaling is a Databricks feature that will help you automatically scale your cluster in whichever direction you need.

7)      KAFKA: When Azure Databricks gathers data, it establishes connections to hubs and data sources like Kafka. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

8)    DBFS: The Databricks filesystem is used to store the data that is saved in Databricks. Workloads involving large amounts of data are an ideal fit for this particular distributed file system. The Hadoop Distributed File System (DVFS) is compatible with Databricks, which is a distributed file system (HDFS).

9)    What is the difference between an instance and a cluster in Databricks? An instance is a virtual machine that helps run the Databricks runtime. A cluster is a group of instances that are used to run Spark applications.

10)   Management Plan: The management plan is how you manage and monitor your Databricks deployment.

11)   Control Plan: The control plane is responsible for managing Spark applications. 

12)   Data Plan: The data plane is responsible for storing and processing data.

13)   The Databricks runtime is often used to execute the Databricks platform’s collection of modules.

14)   Databricks Secret: A secret is a key-value combination that can help keep secret content; it is composed of a unique key name contained within a secret context. Each scope is limited to 1000 secrets. It cannot exceed 128 KB in size.

15)   Any information that is stored in the Databricks Delta format is stored in a table that is referred to as a delta table. Delta tables, in addition to being fully compliant with ACID transactions, also make it possible for reads and writes to take place at lightning speed.

16)   An application environment that is created on top of Apache Spark is referred to as the Databricks Runtime. It provides everything you need to construct and run Spark applications, such as libraries, application programming interfaces (APIs), and tools

17)   Dataframe: A data frame is a particular form of table that is used for the storage of data within the Databricks runtime. There is complete support for ACID transactions, and data frames were developed with the goal of providing fast reads and writes.

The Dataset is an extension of the Dataframe with more added features like type-safety and object-oriented interface. The Dataframes is defined as the distributed collection organized into named columns.

18)   Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

Azure Synapse Analytics.

It was developed specifically to manage tables with hundreds of millions of rows. Because it is based on a Massively Parallel Processing, or MPP, architecture, Synapse SQL is able to conduct complicated queries and provide the query answers within seconds, even when working with large amounts of data. This is made possible by the fact that Azure Synapse Analytics can distribute data processing across numerous nodes.

19)   The Dedicated SQL Pool of Azure Synapse Analytics is a collection of technologies that enables you to leverage the platform that is typically utilized for enterprise data warehousing. The provisioning of the resources in the Data Warehousing Units is accomplished with the help of Synapse SQL (DWU). A dedicated SQL pool improves the efficiency of queries and decreases the amount of data storage that is required by storing information in both columnar and relational tables.

Star Schema

Snowflake Schema

Hierarchies for the dimensions are stored in the dimensional table.

Hierarchies are divided into separate tables.

It contains a fact table surrounded by dimension tables.

One fact table surrounded by dimension table which are in turn surrounded by dimension table

In a star schema, only single join creates the relationship between the fact table and any dimension tables.

A snowflake schema requires many joins to fetch the data.

Simple DB Design.

Very Complex DB Design.

Denormalized Data structure and query also run faster.

Normalized Data Structure.

High level of Data redundancy

Very low-level data redundancy

Single Dimension table contains aggregated data.

Data Split into different Dimension Tables.

Cube processing is faster.

Cube processing might be slow because of the complex join.

Offers higher performing queries using Star Join Query Optimization.
Tables may be connected with multiple dimensions.

The Snowflake schema is represented by centralized fact table which unlikely

connected with multiple dimensions.

 

20)   OLAP:  An OLAP system is designed to process large amounts of data quickly, allowing users to analyze multiple data dimensions in tandem. Teams can use this data for decision-making and problem-solving. OLAP’s multidimensional schema is well suited for complex queries that draw from multiple data sets, such as historical and current data, including from OLTP sources as mentioned. OLAP systems are designed to process queries that include thousands to millions of rows of data. Data is updated hourly to daily depending on the needs of the organization. OLAP databases process significantly more data, so their response times are slower. OLAP systems require massive amounts of data storage capacity to function

Organizations use OLTP systems to run their business while OLAP systems help them understand their business. 

21)   OLTP: OLTP systems are designed to handle large volumes of transactional data involving multiple users. Relational databases rapidly update, insert, or delete small amounts of data in real time. Most OLTP systems are used for executing transactions such as online hotel bookings, mobile banking transactions, ecommerce purchases, and in-store checkout. Many OLAP systems pull their data from OLTP databases via an ETL pipeline and can provide insights such as analyzing ATM activity and performance over time. An OLTP system stores transaction data in a relational database, optimized to handle the large volumes of transactional data funneled into this system. OLTP systems typically update a few rows of data at a time in real time or near real time. OLTP systems are also backed up much more frequently than OLAP systems—due to OLTP’s nature as a transaction processing tool, regular backups are required to maintain business operations and comply with relevant legal and regulatory requirements. OLTP systems have response times that are measured in milliseconds. OLTP systems have relatively modest data storage requirements

What is Azure Data Factory?

Cloud-based integration service that allows creating data-driven workflows in the cloud for orchestrating and automating data movement and data transformation.

·         Using Azure data factory, you can create and schedule the data-driven workflows(called pipelines) that can ingest data from disparate data stores.

·         It can process and transform the data by using compute services such as HDInsight Hadoop, Spark, Azure Data Lake Analytics, and Azure Machine Learning.

MapReduce is a programming paradigm that enables massive scalability across hundreds or thousands of servers in a Hadoop cluster. As the processing component, MapReduce is the heart of Apache Hadoop. The term "MapReduce" refers to two separate and distinct tasks that Hadoop programs perform.

22) The integration runtime is the compute infrastructure that Azure Data Factory uses to provide the following data integration capabilities across various network environments.

a)       Self Hosted IR

b)      Azure IR

c)       Azure-SSIS IR

  • Datasets: Sources of data. In simple words, it is a data structure that holds our data.
  • Linked services: These store information that is very important when it comes to connecting an external source.

23)   Val can’t be reassigned whereas Var can be reassigned.

How to Handle Corrupt Records in Databricks:

Creation DataFrame using DROPMALFORMED mode

Here w

e are creating the dataframe by reading the file. By using the schema() function, we can define our schema while reading the file's contents. Here we are using DROPMALFORMED mode while reading the file. This mode populates only cleaned records without throwing any error, and the corrupted records are discarded during the creation of the dataframe.

The file contains only five records, out of which four are cleaned, and one is corrupted. In the below result set, if you observe, the dataframe contains only four cleaned records, and the bad record is not populated.

val empDF = spark.read
.schema("id Integer,name String,doj Date,salary Integer")
.option("header",true)
.option("mode", "DROPMALFORMED") FailFast”
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/empdata.txt")
display(empDF)

val empDF = spark.read
.schema("id Integer,name String,doj Date,salary Integer,_corrupt_record String")
.option("header",true)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("dateformat","dd.MM.yyyy")
.csv("/FileStore/tables/empdata.txt")
display(empDF)

 

Step 4: Filter Clean Records

For cleaned records the "_corrupt_record" column populates "null" values. Here we are filtering only cleaned records by keeping col("_corrupt_record").isNullcondition in filter function and there by dropping "_corrupt_record".

import org.apache.spark.sql.functions._
//Filter the cleaned records
empDF.filter(col("_corrupt_record").isNull).drop(col("_corrupt_record")).show()

ADLS VS BLOB STORAGE:

Structure
Blob: Flat namespace object store.
ADLS: Hierarchical namespaces (much like a File System).

Purpose
Blob: General purpose object store for a wide variety of storage scenarios, including big data analytics.
ADLS: Optimized storage for big data analytics workloads.

Performance (Analytics Workload)
Blob: Good storage retrieval performance.
ADLS: Better storage retrieval performance.

Cost
Blob: High cost for Analysis.
ADLS: Low cost for Analysis.

Hierarchical namespaces organize blob data into directories and stores metadata about each directory and the files within it. They keep the data organized, which yields better storage and retrieval performance for an analytical use case and lowers the cost of analysis. This structure allows operations, such as directory renames and deletes, to be performed in a single atomic operation.
Flat namespaces, by contrast, require several operations proportionate to the number of objects in the structure.

PYSPARK SERIALIZATION:

Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations.

MarshalSerializer

Serializes objects using Python’s Marshal Serializer. This serializer is faster than PickleSerializer, but supports fewer datatypes.

class pyspark.MarshalSerializer

PickleSerializer

Serializes objects using Python’s Pickle Serializer. This serializer supports nearly any Python object, but may not be as fast as more specialized serializers.

 

PySpark - SparkContext

 

SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes.

SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as ‘sc’, so creating a new SparkContext won't work.

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

·         Master  It is the URL of the cluster it connects to.

·         appName  Name of your job.

·         sparkHome  Spark installation directory.

·         pyFiles  The .zip or .py files to send to the cluster and add to the PYTHONPATH.

·         Environment  Worker nodes environment variables.

·         batchSize  The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size.

·         Serializer  RDD serializer.

·         Conf  An object of L{SparkConf} to set all the Spark properties.

·         Gateway  Use an existing gateway and JVM, otherwise initializing a new JVM.

·         JSC  The JavaSparkContext instance.

·         profiler_cls  A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler).

RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task.

To apply operations on these RDD's, there are two ways

·         Transformation and

·         Action

 

For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

There are two types of shared variables supported by Apache Spark

·         Broadcast: Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. 

·         


·         Accumulator: Accumulator variables are used for aggregating the information through associative and commutative operations.

·         


PYSPARK CONF

To run a Spark application on the local/cluster, you need to set a few configurations and parameters, this is what SparkConf helps with. It provides configurations to run a Spark application. The following code block has the details of a SparkConf class for PySpark.

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)

 

 

PYSPARK SPARK FILES:

In Apache Spark, you can upload your files using sc.addFile (sc is your default SparkContext) and get the path on a worker using SparkFiles.get. Thus, SparkFiles resolve the paths to files added through SparkContext.addFile().

SparkFiles contain the following classmethods

·         get(filename)

·         getrootdirectory()

PYSPARK STORAGE LEVEL:

StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions.

PYTHON LAMBDA

A lambda function is a small anonymous function. A lambda function can take any number of arguments, but can only have one expression.

x = lambda a : a + 10
print(x(5))

The power of lambda is better shown when you use them as an anonymous function inside another function.

Say you have a function definition that takes one argument, and that argument will be multiplied with an unknown number:

def myfunc(n):
  
return lambda a : a * n

mydoubler = myfunc(
2)
mytripler = myfunc(
3)

print(mydoubler(11))
print(mytripler(11))

 

What is Apache Parquet?

Apache Parquet is a file format designed to support fast data processing for complex data, with several notable characteristics:

Columnar: Unlike row-based formats such as CSV or Avro, Apache Parquet is column-oriented – meaning the values of each table column are stored next to each other, rather than those of each record:



Open-source: Parquet is free to use and open source under the Apache Hadoop license, and is compatible with most Hadoop data processing frameworks.

Self-describing: In addition to data, a Parquet file contains metadata including schema and structure. Each file stores both the data and the standards used for accessing each record – making it easier to decouple services that write, store, and read Parquet files.

SQL SPLITTER:

DECLARE @start_position INT,
            @ending_position INT
    SELECT @start_position = 1,
            @ending_position = CHARINDEX(@delimiter_character, @string_value)
    WHILE @start_position < LEN(@string_value) + 1
            BEGIN
        IF @ending_position = 0 
           SET @ending_position = LEN(@string_value) + 1
        INSERT INTO @result_set (splited_data) 
        VALUES(SUBSTRING(@string_value, @start_position, @ending_position - @start_position))
        SET @start_position = @ending_position + 1
        SET @ending_position = CHARINDEX(@delimiter_character, @string_value, @start_position)

Audit Log Table: https://www.youtube.com/watch?v=ZdeatJidrAo

Pivot_df = Explode_df.select.groupby(“operation”).pivot(“key”).value

Display(pivot_df)

 

From pyspark.sql import sparksession

Spark=Sparksession.builder.master(“local”).getorcreate()

Print(spark.sparkcontext.version)

Df1=spark.createdataFrame(data=employ_data,schema=employ_schema)

 

Narrow Transformation: Filter is NT(1:1 transformation)

Df2=df1.filter(employeedid=200)

Wide Transformation: Group by is WT(Many:1 Transformation usually involves shuffle)

Df2=df1.groupby(“dept”).count()

Z-ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms. This behavior dramatically reduces the amount of data that Delta Lake on Databricks needs to read. To Z-order data, you specify the columns to order on in the ZORDER BY clause:

SQL

Copy

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

 

 

Repartition: It can be used to increase or decrease the partition. It always shuffle the partition before creating new partition. They are used for performance optimization.

Coaelsce: It always reduce the partition. Coalesce is more performant than Repartition. It creates uneven partition.

Sc.defaultparallelism created 8 partition by default

Df.rdd.getnumpartition()

Cache VS Persist:

The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SERIALIZATION, MEMORY_AND_DISK_SER, DISK_ONLY).

Import org.apache.spark.storage.StorageLevel

Var rdd1= rdd1.persist(StorageLevel.Memory_Only)

By default storage level for RDD is Memory_Only

Memory_Only_2: it will replicate partition into two cluster nodes.

 

What is Catalyst optimizer. An optimizer that automatically finds out the most efficient plan to execute data operations specified in the user's program. It “translates” transformations used to build the Dataset to physical plan of execution.

 

Df.createdataframe(data=simple_data, schema=columns)

 

 

NTILE(N) SQL RANK function

We use the NTILE(N) function to distribute the number of rows in the specified (N) number of groups. Each row group gets its rank as per the specified condition. We need to specify the value for the desired number of groups.

3rd Highest Salary

select distinct salary from employee a where 3 >= (select count(distinct salary) from employee b where a.salary <= b.salary) order by a.salary desc;

 

Why choose a tabular solution:

  • easier for understanding and creating the model;
  • works quicker than multidimensional cubes for queries based on columns;
  • hardware, such as disks, is not essential. However, tabular is a memory dependent solution, and more memory will ensure better performance;
  • more efficient data compression about one-tenth of the size, whereas compressed multidimensional data takes up a third of the size of the original database

Why choose a multidimensional solution

  • works better with a large amount of data – when we are talking about terabytes, it's better to go with the multidimensional database. If your database requires more than five terabytes, multidimensional is the only option.
  • performs better in terms of scalability
  • some features, such as aggregations or actions, are supported in the multidimensional model only

 

Clustered ColumnStore Index

The index can be created during the creation of the table. Unlike row level indexes, columnstore indexes do not require the column names in the indexes of the corresponding table. Clustered columnstore indexes operate on the entire table which is at the data page level. In general clustering is arranging data in a specific order, so when using row level indexes, we will sort the rows of the indexed column or column(s) specified in the particular index. But when it comes to a columnstore index, the clustering is done on the column followed by each column.

With the same size of keys, The nonclustered indexes need more space than clustered indexes.

 

 

Logging level in ADF Data flow:

Verbose: Shows the partition info and rows O/p for every transformation

Basic: Not show the partition info

None: Only shows the rows copy from source to sink.

 

 

MapReduce is a programming model and an associated implementation for processing and generating big data sets with a paralleldistributed algorithm on a cluster.

A MapReduce program is composed of a map procedure, which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and a reduce method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "MapReduce System" (also called "infrastructure" or "framework") orchestrates the processing by marshalling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

 

 

Read Excel File In Databricks:

Df=Spark.read.format(“com.crealytics.spark.excel”).option(“inferscema”,true).option(“header”,True). Option(“dataAddress”,”Sheet3!”).load(“filepath”)

 

Relational Database

NoSQL Database

Data is stored in tables

Data can be stored as documents, graphs, key-value pairs, etc.

Vertically scalable

Horizontally scalable

Predefined schema

No predefined schema, hence easier to update

Supports powerful query language

Supports simple query language

Can handle data in moderate volumes

Can handle data in very high volumes

Has a centralised structure

Has a decentralised structure

Data can be written from one or a few locations

Data can be written from many locations

 

 

Azure Cosmos DB

Cosmos Database (DB) is a globally distributed, low latency, multi-model database for managing data at large scales. It is a cloud-based NoSQL database offered as a PaaS (Platform as a Service) from Microsoft Azure. It is a highly available, high throughput, reliable database and is often called a serverless database. Cosmos database contains the Azure Document DB and is available everywhere.

 

How does Azure Cosmos DB work?

If a website used by people all over the world writes its data into a primary database in one location (non-multi-master mode), the people near to the location will be able to retrieve the data faster than the rest of the world due to network latency issues.

But Cosmos DB has multi-master support where the data can be simultaneously written into different databases spread out globally. In this way, the data is replicated onto the user’s nearest region so that it can be accessed faster. But there can still be a difference of milliseconds between the data being replicated and this affects the consistency.

Consistency indicates whether the data are in sync and are at the same state at any given point in time. Cosmos DB offers multiple levels of consistency with varying performances and availability. 

 


 


 


 

Df2=df1.colaesce(1).write.mode(‘overwrite’).parquet(“/dbfs/filestore/”)

 

Default Partition Size: 128 Mb in spark(Spark creates 1 partition for every 128 mb of data size)

Spark.conf.get(“spark.sql.files.MaxPartitionBytes”)

 

Managed Table: Spark manages both metadata & Data. Databricks take care of both metadata and Storage location and store in account. Drop table remove both

 

 

Unmanaged table: Spark only manage Metadata not data. Data is controlled by you. Drop table will only delete the Metadata not data.

Count IPS from the JSON list:

Import json
import pandas as pd
ip_dict2=dict()

for j in samplejson:
  print (j)
   temp_dict=json.loads(j[0])
   print(temp_dict)
   for ip in temp_dict[‘ips’]:
   print(ip)
  if ip in ip_dict2.keys():
  ip_dict2[ip]+=1
 else:
ip_dict2[ip]=1
print(pd.Dataframe(ip_Dict2.items(),column[‘ip’,’count’])

 

Sort By: It will do sorting at partition level. Less Efficient. Sort is not going to do any shuffle. It is cost saving.

Order BY: It will do sorting on complete data irrespective of partition. It will do shuffle.

Df.orderBy(‘country’,’salary’
  

Map Function is transformation function and it applies on every element of rdd.

Map partition is a transformation function and it applies on every partition of rdd.

RDD.mapPartition(f,preservePartitioning=false)—mainly we use reformat

Rdd2=Df.rdd.map(lambda x: x[0]+’,’+x[1],x[2],x[3]*2)

Df2=rdd2.todf

 

 

Count Words in EBOOK:

 




Explode will put all the words in a single row which is easier to read









Scenario2 : To check the data skuness and count the words in particular partition using spark_partition_id



 

Types of Join Strategy in Spark:

1)      Broadcast Hash Join: When one of the data frames is small and fits in the memory, it will be broadcasted to all the executors, and a Hash Join will be performed.



2)      Shuffle Hash Join: It happens only when sort-merge join is disabled.



3)      Shuffle Sort Merge Join:

4)      Cartesian Join

5)      Broadcasted Nested Loop Join

o    


o    


 

A view is nothing but a virtual table which takes the output of the query and it can be used in place of tables.

A materialized view is nothing but an indirect access to the table data by storing the results of a query in a separate schema.

Conformed fact is a table which can be used  across multiple data marts in combined with the multiple fact tables.

 

What are the types of Dimensional Modeling?

Following are the Types of Dimensions in Data Warehouse:

  • Conformed Dimension
  • Outrigger Dimension
  • Shrunken Dimension
  • Role-playing Dimension
  • Dimension to Dimension Table
  • Junk Dimension
  • Degenerate Dimension
  • Swappable Dimension
  • Step Dimension

 

 

Count all the rows in all the tables

SELECT

[TableName] = so.name,

    [RowCount] = MAX(si.rows)

FROM

    sysobjects so,

    sysindexes si

WHERE

    so.xtype = 'U'

    AND

    si.id = OBJECT_ID(so.name)

GROUP BY

    so.name

ORDER BY

    2 DESC

 

Get Comma Separated List of all columns in table:

Select TABLE_SCHEMA, TABLE_NAME

, Stuff(

(

Select ',' + C.COLUMN_NAME

From INFORMATION_SCHEMA.COLUMNS As C

Where C.TABLE_SCHEMA = T.TABLE_SCHEMA

    And C.TABLE_NAME = T.TABLE_NAME

Order By C.ORDINAL_POSITION

For Xml Path('')

), 1, 1, '') As Columns

From INFORMATION_SCHEMA.TABLES As T

WHERE T.TABLE_NAME='TableName'

 

List of all Databases

Select Name from dbo.sysdatabases

select @@language  AS DefaultLanguage

select @@SERVERNAME  AS ServerName

 

List Of table without Primary Key:

SELECT SCHEMA_NAME(schema_id) AS SchemaName,name AS TableName

FROM sys.tables

WHERE OBJECTPROPERTY(OBJECT_ID,'TableHasPrimaryKey') = 0

ORDER BY SchemaName, TableName

http://www.codingfusion.com/Post/75-Important-queries-in-SQL-Server-every-developer-should-know

Get ID of latest Record:

Select scope_Identity()

List of Keys:

SELECT  DISTINCT 

Constraint_Name AS [ConstraintName], 

Table_Schema AS [Schema], 

Table_Name AS [Table Name] FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE 

WHERE INFORMATION_SCHEMA.KEY_COLUMN_USAGE.TABLE_NAME='TableName'

 

 

Service Princiapl vs Managed Identity

 Service Principal can be looked at as similar to a service account-alike in a more traditional on-premises application or service scenario. Managed Identities are used for “linking” a Service Principal security object to an Azure Resource like a Virtual Machine, Web App, Logic App or similar. For a 1:1 relation between both, you would use a System Assigned, whereas for a 1:multi relation, you would use a User Assigned Managed Identity.

 

 

1.     df.show() : It will show only the content of the dataframe.

Show() :

df.show(n=20truncate=Truevertical=False)

 

2.    df.collect() : It will show the content and metadata of the dataframe.

3.    df.take() : shows content and structure/metadata for a limited number of rows for a very large dataset. It takes 1 parameter. Df.take(5)

 

 

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true') \
        .option("delimiter", ",").option("escape", '\\').option("escape", ':').\
    option("parserLib", "univocity").option("multiLine", "true").load("file.csv")

 

Load Parquet File in Dataframe:

To read a Parquet file into a Pandas DataFrame, you can use the pd.read_parquet() function. The function allows you to load data from a variety of different sources.

Import pandas as pd

url=

df=pd.read_parquet(url)

print(df.head())

 

 

 

df.join(ther=df2,on=commoncolumn,how=’inner’).select(df1.fieldname,df2.fieldname).show

df1.union(df2).show

 

sum_df1=df1.join(other=df2,on=’name’,how=’inner’).select(df1.name,df1.location,df1.quantity,df2.price)

 

sum_df1.groupby(‘location’).sum(sum_df1.quantity*sum_df1.price).alias(‘revenue’).show()

sum_df1,groupby(‘location’).withcolumn(“derivedcolumn”, col(sum_df1.quantity)*col(sum_df1.price))

 

reading json file:

dbutils.fs.put(“dbfs”,home_json,true)

spark.read.option(“multiline”,”true”).json(“filelocation”)

Create temp table to store array in table

=df.select(explode(“columnName”).alias(“columnanme))

 

 

dff = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", "|").csv("data.txt")
 
from pyspark.sql.functions import regexp_replace
 
dffs_headers = dff.dtypes
for i in dffs_headers:
    columnLabel = i[0]
    newColumnLabel = columnLabel.replace('^','').replace('^','') 
    dff = dff.withColumn(newColumnLabel, regexp_replace(columnLabel, '^\\^|\\^$', ''))
    if columnLabel != newColumnLabel:
      dff = dff.drop(columnLabel)
    
display(dff)

 


 



 

What are the differences between Azure Synapse and Data Factory?

To begin with, Azure Synapse is an analytics service, while Data Factory is a hybrid data integration service. Synapse brings together data warehousing and big data analysis. Data Factory simplifies ETL at scale. Azure Data Factory deployments allow developers to integrate disparate data sources, but Synapse offers a unified experience to manage, prepare, and serve data for BI and ML needs. Data Factory lacks reporting applications like Power BI and needs improvement in terms of speed and time. Whereas, in the case of Synapse, integration with 3rd party tools is difficult; it lacks adequate SQL support.

 

Val schema=””

Val df= spark.read.format(“csv”).option(“header”,”true”).schema(schema).load(“filepath”)

 

 

Data=””

Columns=[“Name”,”Age”]

Df=spark.createdataframe(data=data,schema=Columns)

 

Emp_df=emp_df.join(dept_df,emp.dept_id=dept_df.dept_id,join=”inner”).show(truncate=false)