Wednesday, July 6, 2016

Spark Core Advanced concepts Cache, Accumulator, Java in Spark


Advanced Concepts in Spark Core

1. KeyValue Format
2. Cache - Speed up the data while persisting
3. Accumulator
4. Java

How to create a implicit class

Ex: If we try to use - 1.plus(1) – we will get error

error: value plus is not a member of Int
1.plus(1)

To avoid the error we can write custom implicit conversion
case class IntExtensions(value:Int){
def plus(operand:Int) = value+operand
}


The above code is not looking good. So we will use implicit conversions.
scala>import scala.language.implicitConversions

scala> import scala.language.implicitConversions
import scala.language.implicitConversions

scala> implicit def intToIntExtensions(value:Int)={
     | IntExtensions(value)
     | }
intToInteExtensions: (value: Int)IntExtensions

1. We are using implicit to convert our integer value
If we use scala> 1.plus(1) – result will be 2, Since internally conversion happened.
Scala > 1.plus(1) is nothing but intToIntExtensions(1).plus(1)
RDD Implicits
1. doubleRDDToDoubleRDDFunctions(rdd:RDD[Double]): DoubleRDDFunctions
2. numericRDDToDoubleRDDFunctions[T](rdd:RDD[T]): DoubleRDDFunctions
3. rddTOAsyncRDDActions[T](rdd:RDD[t]): AsyncRDDActions[T]
4. rddToOrderedRDDFunctions[K,V](rdd:RDD[(K,V)]: OrderedRDDFunctions
5. rddToPairRDDFunctions[K,V](rdd:RDD[(K,V)]: PairRDDFunctions
6. rddToSequenceFileRDDFunctions[K,V](rdd:RDD[(K,V)]: SequenceFileRDDFunctions

If we are using older versions then we have to import rdd’s from Spark Context using import.SparkContext._
Pairs


If we see the above data we will have same key and different data for few of the keys.
But data stored in different nodes like below



If we see the above tables data, same key and values will be stored in same node. So it easy to us to pair the key and values.
Pair Methods
1. collectAsMap – which does the same thing as collect and it can return as map and also we can extract only keys or values or we can lookup is to get sequence of values for given key
- keys/values/lookup
2. mapValues -same as values
3. flatMapValues- same as values
4. reduceByKey- it is same as key. But it is transformation instead of action. Since we are having all the keys in same machine, we can work with same worker instead of going back to driver.
5. foldByKey
6. aggregateByKey(0)(….[AGG FUNCTIONS]…)
7. combineByKey(x=>x*x)(….[AGG FUNCTIONS]…) - it accepts functions instead of static which aggregateByKey will accept
8. groupByKey – It is same as using group By method.
9. countByKey
10. countApproxDistinctByKey
11. sampleByKey
12. substractByKey
13. sortByKey – it is made for OrderedRDDFunctions

SQL-Like Pairings

1. join- RDD[K,(TLeft,TRight)]
2. innerjoin- left and right keys match
3. fulleOuterJoin – retrieve all the details which right key does not match(left,NULL) and (NULL,right) left key does not match and left and right keys matched values (left,right)
4. left join- all the records from left table and matched values from the right table (left,NULL) and (left,right)
5. rightJoin-opposite of leftJoin
6. cogroup/groupWith-


rddToPairRDDFunctions[K,V](rdd:RDD[(K,V)]: PairRDDFunctions

Pair Saving

1. saveAS(NewAPI)HadoopFile
- path
- keyClass
- valueClass
- outputFormatClass: outputFormat
2. saveAS(NewAPI)HadoopDataSet
- conf

3. saveAsSequenceFile – available in Scala and python
- saveAsHadoopFile(path,keyClass,valueClass,sequencFileOutputFormat)
1. Cache
1.It’s ability to store intermediate data in memory while keeping distributed that allows possibility of a 100 times performance gain when compared to Hadoop.
2.It is helpful in Machine learning
Ex: How it is useful
3.Suppose we have 1 RDD, use Thread.sleep , performance algorithm to get the result and create one more transformation and final result will be given below.
4.We can Cache or persist , before calling action
5.We can use the in memory or disk to persist the RDD

If we want to run another transformation , then it will call from cache.




a. Cache/persist
- Org.apache.spark.storage.StorageLevel.MEMORY_ONLY- it is default option and it means data will be cached into memory .
b. Persist
- MEMORY_ONLY
- MEMORY_AND_DISK
- DISK_ONLY
- MEMORY_ONLY_SER- if any memory issue we can serialize the data.
- MEMORY_AND_DISK_SER
- …_2 – all these options appended with _2 , So the data will be replicated with another worker.
- This required to computation upon failure.as the DAG scheduler opt’s for the alternate storage location, which will re replicate the data.
- OFF_HEAP- which will store the data in memory of the JVM Heap.
- NONE – the default storage for any RDD.
c. Unpersist(blocking:Boolean=true) – to clear the cache.- No need to call explicitly once RDD is out of scope then automatically clear the cache.

Accumulator
 1. We have seen so many methods either transform or action on RDD.
 2. Accumulator is nothing but to accumulating the values for the shared variables across all clusters.
How to create Accumulator?
val accumulator = sc.accumulator(0,”Accumulator Name”):Accumulator[Int]
 it will accept  Int , Double, Float and Long
ex:
     rdd.foreach(x=>{ 

          doSomethingWith(x) // Action Methods
          accumulator +=1   // it is same like java to increment the value. And called as Worker

      }

val  accumulatedValue = accumulator.value // This is final accumulated value called as Driver

If any worker goes down, then already accumulated value will be there with Driver. If again new worker start and picks the down node, then again it will accumulate the value. It is mismatch. For this we have to find the error accumulated.

Rdd.foreach(x=>{
   Try{
   }
Catch({
      Case _=> errorCounterAccumulator+=1;  // This will useful, do we need to store accumulate the value in case of error or not.
}
}

Java in Spark

In Java 8 we have Lambdas. But before that we have no lambda expressions. For this we need to write different syntax in Spark RDD.

Ananymous inner class
JavaRDD.[INSERT METHOD OF CHOICE]{
   new Function(){
      Public TOut call(TIn value){
      //process(value) -> TOut
      }
   }
}
Functions available in Java


The above all methods available in org.apache.spark.api.java.function
Java does not have implicits, we need to create manually

JavaPairRDD mapToPair (PairFunction)

Thank you very much for viewing this post.

AddToAny

Contact Form

Name

Email *

Message *