Commit 12aa1d92 authored by mxx's avatar mxx

正负反馈

parent 242e5786
...@@ -83,6 +83,8 @@ subprojects { ...@@ -83,6 +83,8 @@ subprojects {
compile(group: 'com.google.guava', name: 'guava', version: '14.0.1') { compile(group: 'com.google.guava', name: 'guava', version: '14.0.1') {
force = true force = true
} }
// //
// compile(group: 'com.esotericsoftware', name: 'kryo', version: '4.0.1') { // compile(group: 'com.esotericsoftware', name: 'kryo', version: '4.0.1') {
// force = true // force = true
......
---------------------------------------------------------------- ----------------------------------------------------------------
Tue Aug 08 17:53:20 CST 2017: Thu Jul 05 11:38:39 CST 2018:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015d-c143-4d70-00001bbd4f08 Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-0164-6885-3ad7-00000ee09d18
on database directory C:\Users\pc\AppData\Local\Temp\spark-c4ff8077-66ca-4b0c-94ad-67253a5b23d4\metastore with class loader java.net.URLClassLoader@b35fb1c on database directory D:\duiba_code\tuia-nezha-compute-feature-lwj-20180515\metastore_db with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@43b4ec0c
Loaded from file:/C:/Users/pc/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar Loaded from file:/C:/Users/85585/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
java.vendor=Oracle Corporation java.vendor=Oracle Corporation
java.runtime.version=1.7.0_79-b15 java.runtime.version=1.8.0_111-b14
user.dir=C:\Users\pc\IdeaProjects\tuia-nezha-compute user.dir=D:\duiba_code\tuia-nezha-compute-feature-lwj-20180515
os.name=Windows 7 os.name=Windows 10
os.arch=amd64 os.arch=amd64
os.version=6.1 os.version=10.0
derby.system.home=null derby.system.home=null
Database Class Loader started - derby.database.classpath='' Database Class Loader started - derby.database.classpath=''
...@@ -80,14 +80,9 @@ nezha.compute.mongo.db.replsetname = tuia-repset ...@@ -80,14 +80,9 @@ nezha.compute.mongo.db.replsetname = tuia-repset
#nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3 #nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3
#mongodb web api develop #mongodb web api develop
nezha.compute.mongo.web.host =http://192.168.2.84:9666 #nezha.compute.mongo.web.host =http://192.168.2.84:9666
nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3
#mongodb web api online
#nezha.compute.mongo.web.host =http://101.37.205.40
#nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3 #nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3
#mongodb web api 专线 #mongodb web api 专线
#nezha.compute.mongo.web.host =http://10.10.93.196 nezha.compute.mongo.web.host =http://10.10.93.196
#nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3 nezha.compute.mongo.web.key =c31de3ab31034259bba06658682954f3
...@@ -16,7 +16,8 @@ public class TestRead{ ...@@ -16,7 +16,8 @@ public class TestRead{
public static void main(String[] args) { public static void main(String[] args) {
System.out.println("s start time = " + DateUtil.getCurrentTime(DateStyle.YYYY_MM_DD_HH_MM_SS_SSS)); System.out.println("s start time = " + DateUtil.getCurrentTime(DateStyle.YYYY_MM_DD_HH_MM_SS_SSS));
try { try {
BufferedReader readerslotid = new BufferedReader(new FileReader("D:\\duiba_code\\flow_match\\slotid.csv")); //BufferedReader readerslotid = new BufferedReader(new FileReader("D:\\duiba_code\\for-app-fee0\\slotid.csv"));
BufferedReader readerslotid = new BufferedReader(new FileReader("/home/mengxiangxuan/for-app-fee0/slotid.csv"));
readerslotid.readLine(); readerslotid.readLine();
String slotline = null; String slotline = null;
List<String> slotidlist = new ArrayList(); List<String> slotidlist = new ArrayList();
...@@ -28,20 +29,20 @@ public class TestRead{ ...@@ -28,20 +29,20 @@ public class TestRead{
// String slotid=slotidlist.get(2); // String slotid=slotidlist.get(2);
// System.out.println(slotid); // System.out.println(slotid);
// String slotid = "68"; // String slotid = "68";
//slotidlist.size()
AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRModelByKeyFromMD(ModelKeyEnum.FM_CTR_MODEL_v003.getIndex()); AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRModelByKeyFromMD(ModelKeyEnum.FM_CVR_MODEL_v003.getIndex());
final FM fmModel = new FM(entity); final FM fmModel = new FM(entity);
for (int slot=0;slot<1;slot++) { for (int slot=0;slot<slotidlist.size();slot++) {
System.out.println("go "+slot); System.out.println("go "+slot);
String slotid=slotidlist.get(slot); String slotid=slotidlist.get(slot);
System.out.println("slotid "+slotid); System.out.println("slotid "+slotid);
BufferedReader readerAdF = new BufferedReader(new FileReader("D:\\duiba_code\\flow_match\\not_luanch_ad_info.csv")); //BufferedReader readerAdF = new BufferedReader(new FileReader("D:\\duiba_code\\for-app-fee0\\not_luanch_ad_info.csv"));
//BufferedReader readerAdF = new BufferedReader(new FileReader("D:\\duiba_code\\a.csv")); BufferedReader readerAdF = new BufferedReader(new FileReader("/home/mengxiangxuan/for-app-fee0/not_luanch_ad_info.csv"));
readerAdF.readLine();//第一行信息,为标题信息,不用,如果需要,注释掉 readerAdF.readLine();//第一行信息,为标题信息,不用,如果需要,注释掉
String Adline = null; String Adline = null;
List<Map<String, String>> adFeatrueMaplist = new ArrayList<Map<String, String>>(); List<Map<String, String>> adFeatrueMaplist = new ArrayList<Map<String, String>>();
while ((Adline = readerAdF.readLine()) != null) { while ((Adline = readerAdF.readLine()) != null) {
String item[] = Adline.split(",");//CSV格式文件为逗号分隔符文件,这里根据逗号切分 String item[] = Adline.split("\\|");//CSV格式文件为逗号分隔符文件,这里根据逗号切分
if (item[0].equals(slotid)) { if (item[0].equals(slotid)) {
Map<String, String> adfeatrueMap = new HashMap<String, String>(); Map<String, String> adfeatrueMap = new HashMap<String, String>();
adfeatrueMap.put("f108001", item[0].replace(".0","")); adfeatrueMap.put("f108001", item[0].replace(".0",""));
...@@ -59,13 +60,13 @@ public class TestRead{ ...@@ -59,13 +60,13 @@ public class TestRead{
// System.out.println(adFeatrueMaplist.get(i)); // System.out.println(adFeatrueMaplist.get(i));
// } // }
BufferedReader readerSnF = new BufferedReader(new FileReader("D:\\duiba_code\\flow_match\\not_luanch_scene.csv")); //BufferedReader readerSnF = new BufferedReader(new FileReader("D:\\duiba_code\\for-app-fee0\\not_luanch_scene.csv"));
//BufferedReader readerSnF = new BufferedReader(new FileReader("D:\\duiba_code\\b.csv")); BufferedReader readerSnF = new BufferedReader(new FileReader("/home/mengxiangxuan/for-app-fee0/not_luanch_scene.csv"));
readerSnF.readLine(); readerSnF.readLine();
String Snline = null; String Snline = null;
List<Map<String, String>> snFeatrueMaplist = new ArrayList<Map<String, String>>(); List<Map<String, String>> snFeatrueMaplist = new ArrayList<Map<String, String>>();
while ((Snline = readerSnF.readLine()) != null) { while ((Snline = readerSnF.readLine()) != null) {
String item[] = Snline.split(",");//CSV格式文件为逗号分隔符文件,这里根据逗号切分 String item[] = Snline.split("\\|");//CSV格式文件为逗号分隔符文件,这里根据逗号切分
if (item[1].equals(slotid)) { if (item[1].equals(slotid)) {
Map<String, String> snfeatrueMap = new HashMap<String, String>(); Map<String, String> snfeatrueMap = new HashMap<String, String>();
snfeatrueMap.put("f201001", item[0].replace(".0","")); snfeatrueMap.put("f201001", item[0].replace(".0",""));
...@@ -155,7 +156,8 @@ public class TestRead{ ...@@ -155,7 +156,8 @@ public class TestRead{
String maptostr = stdCvrMap.toString(); String maptostr = stdCvrMap.toString();
// System.out.println(maptostr); // System.out.println(maptostr);
File f = new File("D:\\duiba_code\\slot_ad_stat_ctr.txt"); //File f = new File("D:\\duiba_code\\for-app-fee0\\slot_ad_stat_cvr.txt");
File f = new File("/home/mengxiangxuan/for-app-fee0/slot_ad_stat_cvr.txt");
BufferedWriter output = new BufferedWriter(new FileWriter(f,true));//true,则追加写入text文本 BufferedWriter output = new BufferedWriter(new FileWriter(f,true));//true,则追加写入text文本
output.write(maptostr); output.write(maptostr);
output.write("\r\n");//换行 output.write("\r\n");//换行
......
...@@ -51,7 +51,7 @@ public class Testmxx extends TestCase { ...@@ -51,7 +51,7 @@ public class Testmxx extends TestCase {
public void testPredict() throws Exception { public void testPredict() throws Exception {
// AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRDtModelByKeyToMD(ModelKeyEnum.FM_CVR_MODEL_v611.getIndex(), "2018-04-15"); // AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRDtModelByKeyToMD(ModelKeyEnum.FM_CVR_MODEL_v611.getIndex(), "2018-04-15");
AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRModelByKeyFromMD(ModelKeyEnum.FM_CTR_MODEL_v003.getIndex()); AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRModelByKeyFromMD(ModelKeyEnum.FM_CVR_MODEL_v003.getIndex());
// //
// if (null == entity) { // if (null == entity) {
// System.out.println("the model entity is null."); // System.out.println("the model entity is null.");
......
...@@ -11,4 +11,17 @@ repositories { ...@@ -11,4 +11,17 @@ repositories {
dependencies { dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'junit', name: 'junit', version: '4.12'
compile project(':nezha-compute-deploy')
compile project(':nezha-compute-alg')
compile project(':nezha-compute-api')
compile project(':nezha-compute-biz')
compile project(':nezha-compute-common')
compile project(':nezha-compute-mllib')
compile project(':nezha-compute-stat')
compile(group: 'org.apache.spark', name: 'spark-hive_2.11', version: '1.6.1')
compile group: 'cn.com.duiba.nezha-compute-online', name: 'compute-feature', version: '1.0.3-SNAPSHOT'
compile group: 'cn.com.duiba.nezha-compute-online', name: 'compute-core', version: '1.0.3-SNAPSHOT'
compile group: 'cn.com.duiba.nezha-compute-online', name: 'compute-alg', version: '1.0.3-SNAPSHOT'
compile group: 'com.databricks', name: 'spark-csv_2.10', version: '1.4.0'
} }
package notOriPreview;
import cn.com.duiba.nezha.compute.alg.ftrl.FMModel;
import cn.com.duiba.nezha.compute.api.constant.GlobalConstant;
import cn.com.duiba.nezha.compute.biz.utils.mongodb.MongoUtil;
import cn.com.duiba.nezha.compute.common.util.AssertUtil;
public class ModelSave {
/**
* @param modelKey
* @return
*/
private static String getLastModelKey(String modelKey) {
return "nz_last_model_" + modelKey + "_";
}
/**
* @param modelKey
* @return
*/
public static FMModel getModelByKeyFromMD(String modelKey) {
FMModel ret = null;
if (AssertUtil.isAnyEmpty(modelKey)) {
System.out.println("getCTRModelByKeyFromMD empty,modelKey=" + modelKey);
return ret;
}
try {
// 获取缓存Key
String key = getLastModelKey(modelKey);
System.out.println("read model with key="+key);
// 保存
ret = MongoUtil.findByIdT(GlobalConstant.LR_MODEL_ES_TYPE, key, FMModel.class);
} catch (Exception e) {
e.printStackTrace();
}
return ret;
}
}
package notOriPreview;
//
//import cn.com.duiba.nezha.compute.alg.ftrl.FMModel;
//import cn.com.duiba.nezha.compute.common.enums.DateStyle;
//import cn.com.duiba.nezha.compute.common.util.DateUtil;
//import org.apache.log4j.Level;
//import org.apache.spark.SparkConf;
//import org.apache.spark.api.java.JavaRDD;
//import org.apache.spark.api.java.JavaSparkContext;
//import org.apache.spark.api.java.function.*;
//import org.apache.spark.sql.*;
//import org.apache.log4j.Logger;
//import java.io.*;
//import com.alibaba.fastjson.JSON;
//
//import java.util.*;
//
//public class PreviewPredictStateSaveJava {
// public static void main(String[] args) throws IOException {
// Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
// System.out.println("s start time = " + DateUtil.getCurrentTime(DateStyle.YYYY_MM_DD_HH_MM_SS_SSS));
//
// //创建spark环境
// SparkConf conf = new SparkConf().setAppName("PreviewPredict").setMaster("local");
// JavaSparkContext sc = new JavaSparkContext(conf);
// SQLContext sqlContext = new SQLContext(sc);
// //HiveContext hiveContext = new HiveContext(sc.sc());
// //读取hdfs 读取流量信息数据和广告信息数据和模型
// //JavaRDD<String> flowInfoRDD = sc.textFile("file:///D:/duiba_code/for-app-fee0/not_luanch_scene.csv");
// //JavaRDD<String> adInfoRDD = sc.textFile("file:///D:/duiba_code/for-app-fee0/not_luanch_ad_info.csv");
// HashMap<String, String> optionsSN = new HashMap<String, String>();
// optionsSN.put("header", "true");
// optionsSN.put("delimiter", "|");
// optionsSN.put("path", "D:/duiba_code/for-app-fee0/not_luanch_scene.csv");
// //optionsSN.put("path", "hdfs://nameservice1/user/mengxiangxuan/not_luanch_scene.csv");
// HashMap<String, String> optionsAD = new HashMap<String, String>();
// optionsAD.put("header", "true");
// optionsAD.put("delimiter", "|");
// optionsAD.put("path", "D:/duiba_code/for-app-fee0/not_luanch_ad_info.csv");
// //optionsAD.put("path", "hdfs://nameservice1/user/mengxiangxuan/not_luanch_scene.csv");
// DataFrame flowInfoDF=sqlContext.load("com.databricks.spark.csv", optionsSN);
// DataFrame adInfoDF=sqlContext.load("com.databricks.spark.csv", optionsAD);
//// flowInfoDF.show(9);
//// adInfoDF.show(9);
//
//// AdvertModelEntity entity = AdvertCtrLrModelBo.getCTRModelByKeyFromMD(ModelKeyEnum.FM_CVR_MODEL_v003.getIndex());
//// final FM model = new FM(entity);
//// System.out.println("model---------"+model);
//
// String[] slot={"f108001"};//获取广告位id
// List<Row> slotid=adInfoDF.dropDuplicates(slot).select("f108001").collectAsList();
//
// //System.out.println(slotid);
//// for (Row i:slotid){
//// System.out.println("slotid: "+i.get(0));
//// DataFrame slotFlowInfoDF=flowInfoDF.where("f108001=i.get(0)");
//// DataFrame slotAdInfoDF=adInfoDF.where("f108001=i.get(0)");
//// slotFlowInfoDF.join(slotAdInfoDF,slotFlowInfoDF.col("f108001").equalTo(slotAdInfoDF.col("f108001")),"inner");
////
////
//// }
//
// System.out.println("slotid: "+"444,999");
//
// DataFrame slotFlowInfoDF=flowInfoDF.where(String.format("f108001 in ('%s','%s')", "444", "99"));
// DataFrame slotAdInfoDF=adInfoDF.where(String.format("f108001 in ('%s','%s')", "444", "99"));
// DataFrame bindInfoDF=slotFlowInfoDF.join(slotAdInfoDF,slotFlowInfoDF.col("f108001").equalTo(slotAdInfoDF.col("f108001")),"inner");
// System.out.println("slotFlowInfoDF.count()-----"+slotFlowInfoDF.count());
// System.out.println("slotAdInfoDF.count()-----"+slotAdInfoDF.count());
// System.out.println("bindInfoDF.count()-----"+bindInfoDF.count());
// //bindInfoDF.show(2);
//
// //DataFrame sampletest=bindInfoDF.limit(9);
// JavaRDD<Row> sampleRow = bindInfoDF.toJavaRDD();
// sampleRow.foreachPartition(
// new VoidFunction<Iterator<Row>>() {
// @Override
// public void call(Iterator<Row> iteratorRow) throws Exception {
// GetModel.iteratorpredictSample(iteratorRow);
// }
// });
//
//
//
// System.out.println("------------------------------------------------------------------------------------- ");
//
//
// }
//
//
// }
//
//
//
package notOriPreview;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").
setAppName("RDDToDataFrameByReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
List<TestPersonInfo> personList=new ArrayList<>();
personList.add(new TestPersonInfo("Andy","32"));
personList.add(new TestPersonInfo("Michael","23"));
personList.add(new TestPersonInfo("Justin","19"));
JavaRDD<TestPersonInfo> personJavaRDD = sc.parallelize(personList);
System.out.println("1. 直接构建出 JavaRDD<Person>");
Encoder<TestPersonInfo> personEncoder = Encoders.bean(TestPersonInfo.class);
DataFrame personDF = sqlContext.createDataFrame(personList, TestPersonInfo.class);
System.out.println("3. 直接构建出 Dataset<Row>");
personDF.show();
personDF.printSchema();
JavaRDD<Row> personRDD = personDF.toJavaRDD();
System.out.println(personRDD.count());
System.out.println(personRDD.count());
JavaRDD<Map<String,String>> personMapRDD=personRDD.map(new Function<Row,Map<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Map<String,String> call(Row row) throws Exception {
Map<String, String> sampleMap = new HashMap<String, String>();
sampleMap.put("name",row.getAs("name").toString());
sampleMap.put("age",row.getAs("age").toString());
return (sampleMap);
}
}
);
System.out.println(personMapRDD.collect());
}
}
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: notOriPreview.Test
package notOriPreview
import java.io.IOException
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
object HDFSUtil {
val conf: Configuration = new Configuration()
var fs: FileSystem = null
var files: RemoteIterator[LocatedFileStatus] = null
def getFiles(HDFSPath: String) = {
try {
fs = FileSystem.get( new URI( HDFSPath ), conf )
} catch {
case e: IOException => {
e.printStackTrace
}
}
files
}
def getFiles(HDFSPath: String, targetPath: String) = {
try {
fs = FileSystem.get( new URI( HDFSPath ), conf )
// 返回指定路径下所有的文件
files = fs.listFiles( new Path( targetPath ), false )
} catch {
case e: IOException => {
e.printStackTrace
}
}
files
}
def mkdir(finalPath: String) = {
fs.create( new Path( finalPath ) )
}
def rename(oldPath: String, finalPath: String) = {
fs.rename( new Path( oldPath ), new Path( finalPath ) )
}
def exist(existPath: String): Boolean = {
fs.exists( new Path( existPath ) )
}
def delete(deletePath: String) = {
fs.delete( new Path( deletePath ), true )
}
def read(readPath: String) = {
fs.open( new Path( readPath ) )
}
def close() = {
try {
if (fs != null) {
fs.close()
}
} catch {
case e: IOException => {
e.printStackTrace
}
}
}
}
package notOriPreview
import cn.com.duiba.nezha.compute.alg.FM
import org.apache.spark.{SparkConf, SparkContext}
import cn.com.duiba.nezha.compute.alg.ftrl.FMModel
import cn.com.duiba.nezha.compute.api.dto.AdvertModelEntity
import cn.com.duiba.nezha.compute.api.enums.ModelKeyEnum
import cn.com.duiba.nezha.compute.biz.bo.AdvertCtrLrModelBo
import cn.com.duiba.nezha.compute.common.enums.DateStyle
import cn.com.duiba.nezha.compute.common.util.DateUtil
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.log4j.Logger
import java.io.File
object PreviewPredictStateSave {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
System.out.println("s start time = " + DateUtil.getCurrentTime(DateStyle.YYYY_MM_DD_HH_MM_SS_SSS))
//创建spark环境
val conf = new SparkConf().setAppName("PreviewPredict")
val sc = new JavaSparkContext(conf)
val sqlContext = new SQLContext(sc)
val flowInfoDF = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true") //这里如果在csv第一行有属性的话,没有就是"false"
.option("inferSchema", "true") //这是自动推断属性列的数据类型。
.option("delimiter", "|")
//.load("D:/duiba_code/for-app-fee0/not_luanch_scene.csv") //文件的路径
.load("hdfs://nameservice1/user/mengxiangxuan/not_luanch_ad_info.csv").repartition(60) //文件的路径
val adInfoDF = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true") //这里如果在csv第一行有属性的话,没有就是"false"
.option("inferSchema", "true") //这是自动推断属性列的数据类型。
.option("delimiter", "|")
//.load("D:/duiba_code/for-app-fee0/not_luanch_ad_info.csv") //文件的路径
.load("hdfs://nameservice1/user/mengxiangxuan/not_luanch_scene.csv").repartition(60) //文件的路径
flowInfoDF.show(9)
//adInfoDF.show(9)
val slot = Array("f108001")
val slotid = adInfoDF.dropDuplicates(slot).select("f108001").collectAsList
// val slotFlowInfoDF = flowInfoDF.where(String.format("f108001 in ('%s','%s')", "444","99")).repartition(60)
// val slotAdInfoDF = adInfoDF.where(String.format("f108001 in ('%s','%s')", "444","99")).repartition(60)
// val bindInfoDF = slotFlowInfoDF.join(slotAdInfoDF, slotFlowInfoDF("f108001") === slotAdInfoDF("f108001"), "inner").repartition(60)
// //System.out.println("slotFlowInfoDF.count()-----" + slotFlowInfoDF.count)
// //System.out.println("slotAdInfoDF.count()-----" + slotAdInfoDF.count)
// //System.out.println("bindInfoDF.count()-----" + bindInfoDF.count)
//
// //val model=ModelSave.getModelByKeyFromMD("mid_ftrl_fm_cvr_v006")
//
//
// //val bindInforRdd=bindInfoDF.rdd
// val retMapStrRDD = bindInfoDF.mapPartitions(partiton => {
// val model = GetModel.getModel()
// System.out.println("model---------" + model)
// partiton.map(sample => {
// GetModel.predictSample(model, sample)
// })
// }).repartition(48)
// //retMapStrRDD.take(8).foreach(println)
// val retMapPairRDD = retMapStrRDD.map(retMapStr => retMapStr.split("=")).map(retArray => (retArray(0), retArray(1).toFloat)).repartition(48)
// val statMapPairRDD = retMapPairRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => (x._1 / x._2)).repartition(48)
// //statMapPairRDD.take(8).foreach(println)
//
// //目录存在会报错,需要提前删除
// val HDFSPath: String = "hdfs://nameservice1/user/mengxiangxuan/"
// val newTargetPath: String = HDFSPath + "slot_ad_stat_cvr"
// try {
// //获取spark conf
// HDFSUtil.getFiles(HDFSPath)
// //判断路径是否存在 , 如果存在就删除
// if (HDFSUtil.exist(newTargetPath) == true) {
// HDFSUtil.delete(newTargetPath)
// println("删除 " + newTargetPath + " 成功")
// } else {
// println("未发现 " + newTargetPath + " ,准备开始创建!")
// }
// }
//
// //statMapPairRDD.coalesce(1, true).saveAsTextFile(newTargetPath)
// statMapPairRDD.saveAsTextFile(newTargetPath)
// System.out.println("s start time = " + DateUtil.getCurrentTime(DateStyle.YYYY_MM_DD_HH_MM_SS_SSS))
}
}
\ No newline at end of file
package notOriPreview
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestWordCount {
def main(args: Array[String]) {
//val inputFile = "hdfs://user/mengxiangxuan/hello.txt"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
//val textFile = sc.textFile(inputFile)
val list = List("a b c","xx","1 23 45","c c c","88 88")
val lines = sc.parallelize(list)
val wordCount = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCount.foreach(println)
}
}
package notOriPreview
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object Testconnecthive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
//import hiveContext.implicits._
sqlContext.table("tmp.advert.dws_not_luanch_slot_adinfo_mxx") // 库名.表名 的格式
.registerTempTable("advert.dws_not_luanch_slot_adinfo_mxx") // 注册成临时表
sqlContext.sql(
"""
select * from advert.dws_not_luanch_slot_adinfo_mxx limit 10
""".stripMargin).show()
sc.stop()
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment