我写了一个javaweb应用程序,它在我安装在自己电脑上的独立spark上运行良好。但是当我提交到spark集群服务器时,出现了错误。
服务器端群集的环境:
java:1.8.0\u 74版
hadoop:2.6.0版
Spark:1.6.1
斯卡拉:2.10.5
我自己的电脑环境。
java:1.8.0\u 20版
hadoop:2.6.0版
Spark:1.6.1
斯卡拉:2.10.5
错误如下:
16/04/23 22:57:58 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 0, student80-x2): java.io.InvalidClassException: javax.servlet.GenericServlet; local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = -8592279577370996712
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我的maven pom.xml是:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
<build>
<finalName>spark-demo-web</finalName>
</build>
我的代码如下:
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
long t1,t2;
t1=System.currentTimeMillis();
//Spark Context
//String input = "hdfs://localhost:9000/plane/2008c.csv";
String input = "hdfs://10.42.1.61:9000/plane/2008c.csv";
SparkConf conf = new SparkConf().setAppName("org.spark.flight").setMaster("spark://10.42.1.61:7077").set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext=new org.apache.spark.sql.SQLContext(context);
//Create DataFrame
JavaRDD<Flight> flights=context.textFile(input).map(
new Function<String, Flight>(){
public Flight call(String line) throws Exception{
String[] parts=line.split(",");
Flight flight=new Flight();
flight.setaYear(Double.parseDouble(parts[0]));
flight.setbMonth(Double.parseDouble(parts[1]));
flight.setcDayofMonth(Double.parseDouble(parts[2]));
flight.seteUniqueCarrier(parts[8]);
flight.setdFlightNum(parts[9]);
flight.setfOrigin(parts[16]);
flight.setgDest(parts[17]);
if(parts[4].toString().equals("NA")) flight.setiDepTime(0.0);
else flight.setiDepTime(Double.parseDouble(parts[5]));
if(parts[6].toString().equals("NA")) flight.setjArrTime(0.0);
else flight.setjArrTime(Double.parseDouble(parts[6]));
if(parts[15].toString().equals("NA")) flight.setkDepDelay(0.0);
else flight.setkDepDelay(Double.parseDouble(parts[15]));
return flight;
}
});
DataFrame schemaFlight=sqlContext.createDataFrame(flights, Flight.class);
schemaFlight.registerTempTable("flights");
//Select using SQL
//String name = request.getParameter("name");
//String password = request.getParameter("password");
int year=Integer.parseInt(req.getParameter("year")), todayYear=2008;
int month=Integer.parseInt(req.getParameter("month")), todayMonth=3;
int dayofmonth=Integer.parseInt(req.getParameter("dayofmonth")), todayDayOfMonth=15;
String from=req.getParameter("from");
String to=req.getParameter("to");
//Select flights according to the users requirement
//String SQL="Select * from flights where year="+year+" AND month="+month+" AND dayofMonth="+dayofmonth+" AND origin='"+from+"' AND dest='"+to+"'";
String SQL2="Select * from flights WHERE fOrigin='"+ from + "' AND gDest='" + to +"' AND aYear="+ year +" AND bMonth=" + month + " AND cDayofMonth="+dayofmonth+" order by iDepTime";
DataFrame selectFlights=sqlContext.sql(SQL2);
selectFlights.registerTempTable("sf");
selectFlights.cache();
selectFlights.show();
//Map flights data to POJO
List<Flight> result=selectFlights.javaRDD().map(new Function<Row,Flight>(){
public Flight call(Row row){
Flight flight=new Flight();
flight.setaYear(row.getDouble(0));
flight.setbMonth(row.getDouble(1));
flight.setcDayofMonth(row.getDouble(2));
flight.seteUniqueCarrier(row.getString(4));
flight.setdFlightNum(row.getString(3));
flight.setfOrigin(row.getString(5));
flight.setgDest(row.getString(6));
flight.setiDepTime(row.getDouble(8));
flight.setjArrTime(row.getDouble(9));
return flight;
}
}).collect();
//Select out the UniqueCarriers
String SQL3="Select distinct eUniqueCarrier from sf";
DataFrame uc=sqlContext.sql(SQL3);
List<String> ucList=uc.javaRDD().map(new Function<Row, String>(){
public String call(Row row){
return row.getString(0);
}
}).collect();
//Build Linear Regression for each Unique Carrier
Iterator<String> ucIterator=ucList.iterator();
while(ucIterator.hasNext()){
String oneuc=ucIterator.next();
//Select out flight history of the Unique Carrier "oneuc".
System.out.println(oneuc);
String SQL4="Select * from flights WHERE fOrigin='" + from +"' AND gDest='" + to +"' AND (aYear<"+todayYear+" OR (aYear="+todayYear+" and bMonth<"+ todayMonth+") OR (aYear="+todayYear+" AND bMonth="+todayMonth+" AND cDayofMonth<"+todayDayOfMonth+")) AND eUniqueCarrier='"+oneuc+"'";
DataFrame history=sqlContext.sql(SQL4);
//Parse the data
JavaRDD<LabeledPoint> parsedData=history.javaRDD().map(
new Function<Row,LabeledPoint> (){
public LabeledPoint call(Row r) {
double[] v=new double[4];
v[0]=r.getDouble(0);
v[1]=r.getDouble(1);
v[2]=r.getDouble(2);
v[3]=r.getDouble(8);
System.out.println(r.getDouble(10)+ ": "+ v[0] + " " + v[1] + " " + v[2] + " " + v[3]);
return new LabeledPoint(r.getDouble(10), Vectors.dense(v));
}
});
parsedData.cache();
System.out.println("**********"+parsedData.count()+"***********");
// Build the model
int numIterations = 100;
double stepSize = 0.00000001;
//parsedData.rdd()
final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations,stepSize);
System.out.println("***********Prediction*************");
//Predict
Iterator<Flight> resultIterator=result.iterator();
Flight tempf;
while(resultIterator.hasNext()){
tempf=resultIterator.next();
if(tempf.geteUniqueCarrier().equals(oneuc)){
double[] v=new double[4];
v[0]=tempf.getaYear();
v[1]=tempf.getbMonth();
v[2]=tempf.getcDayofMonth();
v[3]=tempf.getiDepTime();
double prediction=model.predict( Vectors.dense(v));
tempf.sethPredictDelay(prediction);
}
}
}
1条答案
按热度按时间oalqel3c1#
spark1.6.1打包版本的servlet api和打包到jar中的版本是不同的。spark使用
org.eclipse.jetty.orbit网站» javax.servlet 3.0.0.v20111016版本
http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.6.1
如果可能的话,尽量不要将servlet api打包到jar中