Spark

Spark

集群计算框架,相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中,内存内运算,比Hadoop快100倍。

优势

Hadoop比

架构

其基础的程序抽象则称为弹性分布式数据集(RDDs),是一个可以并行操作、有容错机制的数据集合。

Spark应用程序作为集群上的独立进程集运行,由SparkContext 主程序中的对象driver program主导

  • 应用提交driver创建SparkContext,SparkContext向Cluster Manager注册,申请运行Executor资源
  • 申请资源后,Executor启动,将运行状况发送到Cluster Manager
  • SparkContext根据RDD依赖关系构建DAG,DAGScheduler解析DAG分成多个Stage(阶段 任务集),计算Stage之间依赖关系,将Stages交给TaskScheduler
  • Executor向SparkContext申请任务,TaskScheduler分发将任务到Executor,SparkContext分发应用代码到Executor
  • 任务在Executor上执行,反馈到TaskScheduler,DAGScheduler。完成后写入数据,释放资源。

RDD

好处

结构

只读分区记录集合,一个RDD多个分区(数据集片段),不同分区可存储到集群上不同节点进行并行计算。

RDD不能直接修改

  • 只能基于物理存储的数据转换创建RDD
  • 基于其他RDD转换创建新的RDD。

transformations and actions

  • transformat操作(map,filter,groupBy,join)接受RDD返回RDD
  • action操作(collect,collect)接受RDD返回一个值或者结果。

transformat不会马上计算,Spark会记住对于base dataset的所有转换(RDD依赖关系)。当有action时才会计算出结果(Spark根据RDD依赖关系生成DAG)返回到the driver program

避免多次转换操作之间数据同步等待,不担心有过多的中间数据。DAG拓扑排序连接一系列RDD操作实现管道化

默认情况下,每次action都会计算,即使是相同的transform。 可以通过持久化(缓存)机制避免这种重复计算的开销。 可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化, 而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

1
2
3
4
5
6
7
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache()  //会调用persist(MEMORY_ONLY)但是语句执行到这里并不会缓存rdd这是rdd还没有被计算生成
>>> print(rdd.count()) //第一次行动操作触发一次真正从头到尾的计算这时才会执行上面的rdd.cache()把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) //第二次行动操作不需要触发从头到尾的计算只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
1
2
3
4
5
6
7
def t_map():
    data = [1, 2, 3, 4, 5]
    with SparkContext() as sc:
        rdd = sc.parallelize(data)
        gl = rdd.map(lambda x: x * 2)
        col = gl.collect()
        print(col)

Why 高效

  • 高容错

    根据DAG重新计算,获得丢失数据,避免数据复制高开销,重算过程多节点并行

  • 中间结果持久化到内存
  • 存放数据都是Java对象,减少序列化和反序列化

RDD依赖

看RDD分区

设置分区 分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

1
2
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array,2) #设置两个分区

划分DAG

DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖把当前RDD加入到当前stage,尽量窄依赖划分到同一个阶段。

运行过程

RUN

Interactive

master 参数

  • local Run Spark with one worker thread.
  • local[n] Run Spark with n worker threads.
  • spark://HOST:PORT Connect to a Spark standalone cluster.
  • mesos://HOST:PORT Connect to a Mesos cluster.
1
pyspark --master local[4]

run

run spark script

1
spark-submit --master local word_count.py
1
sc = SparkContext(master='local[4]')

py local

The RDD.glom() method returns a list of all of the elements within each partition, and the RDD.collect() method brings all the elements to the driver node.

get RDD from collect

sc.parallelize

get file

sc.textFile

diff flatMap and map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def t_map():
    data = [[1, 2], [], [3, 4, 5]]
    with SparkContext() as sc:
        rdd = sc.parallelize(data)
        gl = rdd.map(lambda x: x)
        col = gl.collect()
        print(col)
"""
[[1, 2], [], [3, 4, 5]]
"""

def t_flatmap():
    data = [[1, 2], [], [3, 4, 5]]
    with SparkContext() as sc:
        rdd = sc.parallelize(data)
        gl = rdd.flatMap(lambda x: x)
        print(gl.collect())
"""
[1, 2, 3, 4, 5]
"""

 
def t_flatmap1():
    # 先map 再flat
    data = [[1, 2], [], [3, 4, 5]]
    with SparkContext() as sc:
        rdd = sc.parallelize(data)
        gl = rdd.flatMap(lambda x: x * 2)
        print(gl.collect())

"""
[1, 2, 1, 2, 3, 4, 5, 3, 4, 5]
"""

if __name__ == '__main__':
    t_map()
    t_flatmap()
    t_flatmap1()

print

1
2
3
4
5
rdd.take(100).foreach(print)
rdd.foreach(print)

但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到Driver Program中,因此,这可能会导致内存溢出
rdd.collect().foreach(print)

共享变量 broadcast/accumulators

  • 广播变量用来把变量在所有节点的内存之间进行共享
  • 累加器则支持在所有不同节点之间进行累加计算(比如计数或者求和)

广播变量

允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

  • When use Broadcast variable

    只有在跨多个阶段的任务需要相同数据,数据重要,不更改的值。仅在需要时加载,数据仅发送到包含需要它的执行程序的节点。

    1
    2
    3
    >> broadcastVar = sc.broadcast([1, 2, 3])
    >> broadcastVar.value
    [1,2,3]
    
  • In spark, how does broadcast work?

累加器

运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是, 这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。

1
2
3
4
>>> accum = sc.accumulator(0)
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))
>>> accum.value
10

文件读写

因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句, pyspark也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来

  • 本地 file:///usr/local/spark/mycode/wordcount/writeback.txt

  • hdfs hdfs://localhost:9000/user/hadoop/word.txt

1
2
3
>>> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> val textFile = sc.textFile("/user/hadoop/word.txt")
>>> val textFile = sc.textFile("word.txt")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkContext

PATH = 'hdfs://localhost:9000'


def main():
    with SparkContext(appName='sparkwordcount') as sc:
        # 采用了惰性机制 即使路径错误 也不会马上报错
        input_file = sc.textFile(PATH + '/user/wyx/input/input.txt')
        counts = input_file.flatMap(lambda line: line.split()) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)
        # 是一个目录路径
        counts.saveAsTextFile(PATH + '/user/wyx/output')


if __name__ == '__main__':
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")


>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json"
>>> peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

spark = SparkSession.builder.getOrCreate()
df = spark.read.json('file:///Users/wyx/project/py3.7aio/spark/a.json')
df.select(df.name.alias("username"), df.age).write.csv('file:///Users/wyx/project/py3.7aio/spark/a.csv',header=True,mode='overwrite')
df = spark.read.csv('file:///Users/wyx/project/py3.7aio/spark/a.csv',header=True)
df.show()

dataframe

1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
df = spark.read.json('file:///Users/wyx/project/py3.7aio/spark/a.json')
df.show()
df.printSchema()
df.select(df.name, (df.age + 1)).show()
df.sort(df.age.desc(), df.name.asc()).show()
df.groupBy("age").count().show()
df.agg(avg("age").alias("avg_age")).show()
df.filter(df.age > 20).show()
df.select(df.name.alias("username"), df.age).show()
1
2
3
4
5
6
df = df.filter(df.price != '面议').withColumn("price", df["price"].cast(IntegerType())).\
    filter(df.price >= 50).filter(df.price <= 40000)

mean_list[0] = df.filter(df.area == "海沧").agg({"price": "mean"}).first()['avg(price)']
max_list[5] = df.filter(df.area == "同安").agg({"price": "max"}).first()['max(price)']
mid_list[5] = df.filter(df.area == "同安").approxQuantile("price", [0.5], 0.01)[0]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from spark.ml import spark
from pyspark.sql.functions import *

cols = [('A', 'xx', 'D', 'vv', 4), ('C', 'xxx', 'D', 'vv', 10), ('A', 'x', 'A', 'xx', 3),
            ('E', 'xxx', 'B', 'vv', 3), ('E', 'xxx', 'F', 'vvv', 6), ('F', 'xxxx', 'F', 'vvv', 4),
            ('G', 'xxx', 'G', 'xxx', 4), ('G', 'xxx', 'G', 'xx', 4), ('G', 'xxx', 'G', 'xxx', 12),
            ('B', 'xxxx', 'B', 'xx', 13)]
df = spark.createDataFrame(cols, ['col1', 'col2', 'col3', 'col4', 'd'])

df.show()
# in
df.filter(col('col4').isin(['vv','xx'])).show()
# not in
df.filter(~col('col4').isin(['vv','xx'])).show()

df.filter((df.d < 5) & (
        (col('col1') != col('col3')) |
        ((col('col1') == col('col3')) & (col('col2') != col('col4')))
)).show()

rdd to df

  • 反射机制推断RDD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> from pyspark.sql.types import Row
>>> def f(x):
...     rel = {}
...     rel['name'] = x[0]
...     rel['age'] = x[1]
...     return rel
... 
>>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people")  //必须注册为临时表才能供下面的查询使用
 
>>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
 
Name: 19,Age:Justin
Name: 29,Age:Michael
Name: 30,Age:Andy
  • 编程方式定义RDD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
>>>  from pyspark.sql.types import Row
>>>  from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
 
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
 
//定义一个模式字符串
>>> schemaString = "name age"
 
//根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出schema描述了模式信息模式中包含name和age两个字段
 
 
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
 
>>> peopleDF = spark.createDataFrame(rowRDD, schema)
 
//必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
 
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
 
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19

stream

创建 StreamingContext

1
2
3
4
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 1表示每隔1秒钟就自动执行一次流计算,这个秒数可以自由设定。
ssc = StreamingContext(sc, 1)

setAppName(“TestDStream”)是用来设置应用程序名称。 setMaster(“local[2]”)括号里的参数local[2]字符串表示运行在本地模式下,并且启动2个工作线程

1
2
3
4
5
6
7
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)

File流

输入ssc.start()以后,程序就开始自动进入循环监听状态, 监听程序只监听目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from operator import add
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(add)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

socket

1
2
3
4
5
6
7
8
9
10
11
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(host, port)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

RDD队列流(DStream)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sc = SparkContext(appName="PythonStreamingQueueStream")
ssc = StreamingContext(sc, 1)

# Create the queue through which RDDs can be pushed to
# a QueueInputDStream
rddQueue = []
for i in range(5):
    rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]

# Create the QueueInputDStream and use it do some processing
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()

ssc.start()
time.sleep(6)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

高级流

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面。需要自己下载对应jar (spark-streaming-kafka spark-streaming-flume),复制到Spark目录的jars目录

  • Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
 
   
 
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

kvs = KafkaUtils.createStream(ssc, host:port, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
  • Flume
1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark

sc = SparkContext(appName="FlumeEventCount")
ssc = StreamingContext(sc, 2)

stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()

ssc.start()
ssc.awaitTermination()

Dstearm transfer

  • 无状态转换:每个批次的处理不依赖于之前批次的数据。
  • 有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换和追踪状态变化的转换(updateStateByKey)。

updateStateByKey 跨批次之间维护状态

1
2
3
4
5
6
7
8
9
10
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

window

设定一个滑动窗口的长度,并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算) 下面给给出一些窗口转换操作的含义:

  • window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
  • countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
  • reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

Dstearm output

saveAsTextFiles

存储到mysql

1
2
3
4
5
6
7
8
9
def func(rdd):
    """
    每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,
    为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3
    """
    repartitionedRDD = rdd.repartition(3)
    repartitionedRDD.foreachPartition(dbfunc)
 
running_counts.foreachRDD(func)

MLLib

Spark官方推荐使用spark.ml

pipline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, SparkSession
from pyspark.ml.feature import HashingTF, Tokenizer

spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
prediction = model.transform(test)

prediction.show()
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
    
>>>
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
| id|              text|               words|            features|       rawPrediction|         probability|prediction|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  4|       spark i j k|    [spark, i, j, k]|(262144,[20197,24...|[-1.6609033227472...|[0.15964077387874...|       1.0|
|  5|             l m n|           [l, m, n]|(262144,[18910,10...|[1.64218895265644...|[0.83783256854767...|       0.0|
|  6|spark hadoop spark|[spark, hadoop, s...|(262144,[155117,2...|[-2.5980142174393...|[0.06926633132976...|       1.0|
|  7|     apache hadoop|    [apache, hadoop]|(262144,[66695,15...|[4.00817033336812...|[0.98215753334442...|       0.0|
+---+------------------+--------------------+--------------------+--------------------+--------------------+----------+

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000

TF-IDF

词语由t表示,文档由d表示,语料库由D表示。

  • 词频TF(t,d)是词语t在文档d中出现的次数
  • 文件频率DF(t,D)是包含词语的文档的个数

首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sentenceData = spark.createDataFrame(
    [(0, "I heard about Spark and I love Spark"), (0, "I wish Java could use case classes"),
     (1, "Logistic regression models are neat")]).toDF("label", "sentence")

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
rescaledData.show()

>>>
+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(20,[0,5,9,13,17]...|
|    0|(20,[2,7,9,13,15]...|
|    1|(20,[4,6,13,15,18...|
+-----+--------------------+

+-----+--------------------+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|    0|I heard about Spa...|[i, heard, about,...|(20,[0,5,9,13,17]...|(20,[0,5,9,13,17]...|
|    0|I wish Java could...|[i, wish, java, c...|(20,[2,7,9,13,15]...|(20,[2,7,9,13,15]...|
|    1|Logistic regressi...|[logistic, regres...|(20,[4,6,13,15,18...|(20,[4,6,13,15,18...|
+-----+--------------------+--------------------+--------------------+--------------------+

word2vec

  • CBOW ,其思想是通过每个词的上下文窗口词词向量来预测中心词的词向量。
  • Skip-gram,其思想是通过每个中心词来预测其上下文窗口词,并根据预测结果来修正中心词的词向量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "),),
    ("I wish Java could use case classes".split(" "),),
    ("Logistic regression models are neat".split(" "),)
], ["text"])

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
result.show()

for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

>>>>
Text: [Hi, I, heard, about, Spark] => 
Vector: [0.02057519257068634,0.027896256744861604,0.0453112430870533]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.014936649905783788,-0.0039047444505350927,-0.009782610195023673]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.08842044896446169,0.010355661995708943,-0.04723416268825531]

CountVectorizer 词频统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b c a a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)

>>>
+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, c, a, a]|(3,[0,1,2],[3.0,1.0,1.0])|
+---+---------------+-------------------------+

trick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 数据集随机分成训练集和测试集,其中训练集占70%
trainingData, testData = df.randomSplit([0.7,0.3])

# 还能看到我们已经设置的参数的结果
print("LogisticRegression parameters:\n" + LogisticRegression().explainParams())


# 模型评估
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Test Error = " + str(1.0 - lrAccuracy))

evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
print("Test Error = " + str(1.0 - accuracy))

evaluatorRegressor = RegressionEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("rmse")
rmse = evaluatorRegressor.evaluate(predictionsRegressor)
print("Root Mean Squared Error (RMSE) on test data = " +str(rmse))


# pipeline get stage
lrModel = lrPipelineModel.stages[2]
print("Coefficients: " + str(lrModel.coefficients)+"Intercept: "+str(lrModel.intercept)+"numClasses: "+str(lrModel.numClasses)+"numFeatures: "+str(lrModel.numFeatures))
Coefficients: [-0.0396171957643483,0.0,0.0,0.07240315639651046]Intercept: -0.23127346342015379numClasses: 2numFeatures: 4

Java code

parallelize get javadd

1
2
3
4
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));

map reduce

1
2
3
4
5
6
7
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
1
2
3
4
5
6
7
8
9
10
class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());