注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

阿弥陀佛

街树飘影未见尘 潭月潜水了无声 般若观照心空静...

 
 
 

日志

 
 
关于我

一直从事气象预报、服务建模实践应用。 注重气象物理场、实况场、地理信息、本体知识库、分布式气象内容管理系统建立。 对Barnes客观分析, 小波,计算神经网络、信任传播、贝叶斯推理、专家系统、网络本体语言有一定体会。 一直使用Java、Delphi、Prolog、SQL编程。

网易考拉推荐

Spark GBT ,RDD, LabeledPoint测试代码  

2016-02-14 22:33:41|  分类: Spark |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
package data.hour.test

import java.io.File

import common.EleChk._
import common.Utils._
import data.hour.EleH
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, DataFrame}

import scala.collection.immutable.SortedMap

/**
* Created by 何险峰,成都 on 16-2-14.
*/
object EleGBT {
import EleH._
import common.SparkSc._

def main(args: Array[String]) {
val ymdh = "2016012506" //PRE_1h
val eleIdx = EleNms.indexOf("TEM")
val rdd = EleH(ymdh)
//val staArr = mk_libsvm(rdd0,eleIdx)
val staEleMap = mk_model(rdd,eleIdx)
sc.stop
}
def mk_libsvm(rdd : RDD[EleH],eleIdx : Int,fnm:String) ={
def eleh2LabeledPoint(h: EleH,eleIdx : Int):LabeledPoint={
import org.apache.spark.mllib.linalg.Vectors
val loc = Array(h.lat, h.lon, h.alt)
val eles= Array(h.prs, h.prsSea, h.tem, h.dpt, h.rhu, h.vap, h.pre1h,
h.winDAvg2mi, h.winSAvg2mi,
h.gst, h.t5cm,h.t10cm, h.t15cm, h.t20cm, h.t40cm,
h.u, h.v, h.qse,
h.gtem,h.vism,h.dddmax,h.ffmax
)
val lb = getVal(h,eleIdx).toDouble
val arr = loc ++ eles.slice(0,eleIdx) ++ eles.slice(eleIdx+1,eles.length)
val ele_idxArr = arr.zipWithIndex
.filter(f => !eqMiss(f._1))
.map(f => (f._2,f._1.toDouble))
val features = Vectors.sparse(arr.length,ele_idxArr)
LabeledPoint(lb,features)
}
val data0 = rdd.filter(eleh => !eqMiss(getVal(eleh, eleIdx)))
val data = data0.map(f => eleh2LabeledPoint(f,eleIdx))
MLUtils.saveAsLibSVMFile(data,fnm)
//data0.map(f => f.sta).collect
}
def mk_model(rdd : RDD[EleH],eleIdx : Int) {
val fnm = s"${common.Config.libsvmDir}/${EleNms(eleIdx)}.txt"
mk_libsvm(rdd,eleIdx,fnm)
val data = sqlContext.read.format("libsvm").load(fnm)
val newData = mkGbt(data,eleIdx)
val lb = data.select("label").collect.map(r => r.getAs[Double]("label")).map(_.toFloat)
val newEleArr = newData.collect.map(r => r.getAs[Double]("label")).map(_.toFloat)
val z = lb zip newEleArr
val new_rmse = MSE(z)
println(s"new rmse=${new_rmse}")
}
def mkGbt(data : DataFrame,eleIdx : Int) : DataFrame={
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(data)
val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(30)
.setLossType("squared")
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
val model = pipeline.fit(data)
val predictions = model.transform(data)
//predictions.select("prediction", "label", "features").show(3)
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"RMSE on ${EleNms(eleIdx)} data = ${rmse}\n")
logMsg(common.Config.logGbtDir,EleNms(eleIdx),"RMSE0",rmse.toString)
// 第二次拟合
val data1 = predictions.selectExpr("prediction as label","features")
val model1 = pipeline.fit(data1)
val predictions1 = model1.transform(data1)
val rmse1 = evaluator.evaluate(predictions1)
println(s"RMSE on ${EleNms(eleIdx)} data = ${rmse1}\n")
logMsg(common.Config.logGbtDir,EleNms(eleIdx),"RMSE1",rmse1.toString)
predictions1.selectExpr("prediction as label","features")
}
}
  评论这张
 
阅读(375)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017