org.apache.spark.broadcast.Broadcast.value()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(189)

本文整理了Java中org.apache.spark.broadcast.Broadcast.value()方法的一些代码示例,展示了Broadcast.value()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Broadcast.value()方法的具体详情如下:
包路径:org.apache.spark.broadcast.Broadcast
类名称:Broadcast
方法名:value

Broadcast.value介绍

暂无

代码示例

代码示例来源:origin: OryxProject/oryx

@Override
 public Iterator<Tuple2<Integer,Integer>> call(
   Tuple2<Integer,Iterable<Integer>> userIDsAndItemIDs) {
  Integer userID = userIDsAndItemIDs._1();
  Collection<Integer> positiveItemIDs = Sets.newHashSet(userIDsAndItemIDs._2());
  int numPositive = positiveItemIDs.size();
  Collection<Tuple2<Integer,Integer>> negative = new ArrayList<>(numPositive);
  List<Integer> allItemIDs = allItemIDsBC.value();
  int numItems = allItemIDs.size();
  // Sample about as many negative examples as positive
  for (int i = 0; i < numItems && negative.size() < numPositive; i++) {
   Integer itemID = allItemIDs.get(random.nextInt(numItems));
   if (!positiveItemIDs.contains(itemID)) {
    negative.add(new Tuple2<>(userID, itemID));
   }
  }
  return negative.iterator();
 }
});

代码示例来源:origin: databricks/learning-spark

public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
   String sign = callSignCount._1();
   String country = lookupCountry(sign, signPrefixes.value());
   return new Tuple2(country, callSignCount._2());
  }}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

代码示例来源:origin: OryxProject/oryx

private static void saveFeaturesRDD(JavaPairRDD<Integer,float[]> features,
                  Path path,
                  Broadcast<? extends Map<Integer,String>> bIndexToID) {
 log.info("Saving features RDD to {}", path);
 features.map(keyAndVector -> {
  String id = bIndexToID.value().get(keyAndVector._1());
  float[] vector = keyAndVector._2();
  return TextUtils.joinJSON(Arrays.asList(id, vector));
 }).saveAsTextFile(path.toString(), GzipCodec.class);
}

代码示例来源:origin: OryxProject/oryx

return new Tuple2<>(
  Long.valueOf(tokens[3]),
  new Rating(bUserIDToIndex.value().get(tokens[0]),
        bItemIDToIndex.value().get(tokens[1]),

代码示例来源:origin: OryxProject/oryx

private static RDD<Tuple2<Object,double[]>> readAndConvertFeatureRDD(
  JavaPairRDD<String,float[]> javaRDD,
  Broadcast<? extends Map<String,Integer>> bIdToIndex) {
 RDD<Tuple2<Integer,double[]>> scalaRDD = javaRDD.mapToPair(t ->
   new Tuple2<>(bIdToIndex.value().get(t._1()), t._2())
 ).mapValues(f -> {
   double[] d = new double[f.length];
   for (int i = 0; i < d.length; i++) {
    d[i] = f[i];
   }
   return d;
  }
 ).rdd();
 // This mimics the persistence level establish by ALS training methods
 scalaRDD.persist(StorageLevel.MEMORY_AND_DISK());
 @SuppressWarnings("unchecked")
 RDD<Tuple2<Object,double[]>> objKeyRDD = (RDD<Tuple2<Object,double[]>>) (RDD<?>) scalaRDD;
 return objKeyRDD;
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public Integer call(Iterable<String> values) {
   Map<String, PairOfDoubleInteger>  patients = buildPatientsMap(values);          
   String filterType = (String) broadcastVarFilterType.value();
   Double filterValueThreshold = (Double) broadcastVarFilterValueThreshold.value();
   int passedTheTest = getNumberOfPatientsPassedTheTest(patients, filterType, filterValueThreshold);
   return passedTheTest;
  }
});

代码示例来源:origin: OryxProject/oryx

AppPMMLUtils.addExtension(pmml, "epsilon", epsilon);
addIDsExtension(pmml, "XIDs", userFeaturesRDD, bUserIndexToID.value());
addIDsExtension(pmml, "YIDs", itemFeaturesRDD, bItemIndexToID.value());
return pmml;

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
 public Boolean call(String record) {
  String ref = REF.value();
  String[] tokens = record.split(",");
  if (ref.equals(tokens[1]))  {
    return true; // do return these records
  }
  else {
    return false; // do not retrun these records
  }
 }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public java.util.Iterator<scala.Tuple2<String, scala.Tuple2<String, Integer>>> call(String line) throws Exception {
    List<Tuple2<String, Tuple2<String, Integer>>> list = new ArrayList<Tuple2<String, Tuple2<String, Integer>>>();
    String[] tokens = line.split("\\s");
    for (int i = 0; i < tokens.length; i++) {
      int start = (i - brodcastWindow.value() < 0) ? 0 : i - brodcastWindow.value();
      int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1 : i + brodcastWindow.value();
      for (int j = start; j <= end; j++) {
        if (j != i) {
          list.add(new Tuple2<String, Tuple2<String, Integer>>(tokens[i], new Tuple2<String, Integer>(tokens[j], 1)));
        } else {
          // do nothing
          continue;
        }
      }
    }
    return list.iterator();
  }
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public Iterator<Tuple2<String,Integer>> call(String sequence) {
   int K = broadcastK.value();         
   List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
   for (int i=0; i < sequence.length()-K+1 ; i++) {
     String kmer = sequence.substring(i, K+i);
     list.add(new Tuple2<String,Integer>(kmer, 1));
   }         
   return list.iterator();
  }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public Iterator<Row> call(String line) throws Exception {
    List<Row> list = new ArrayList<>();
    String[] tokens = line.split("\\s");
    for (int i = 0; i < tokens.length; i++) {
      int start = (i - brodcastWindow.value() < 0) ? 0
          : i - brodcastWindow.value();
      int end = (i + brodcastWindow.value() >= tokens.length) ? tokens.length - 1
          : i + brodcastWindow.value();
      for (int j = start; j <= end; j++) {
        if (j != i) {
          list.add(RowFactory.create(tokens[i], tokens[j], 1));
        } else {
          // do nothing
          continue;
        }
      }
    }
    return list.iterator();
  }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
    final int N = topN.value();
    SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
    while (iter.hasNext()) {
     Tuple2<String,Integer> tuple = iter.next();
     localTopN.put(tuple._2, tuple._1);
     // keep only top N 
     if (localTopN.size() > N) {
       localTopN.remove(localTopN.firstKey());
     } 
    }
    return Collections.singletonList(localTopN).iterator();
  }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
 public Tuple2<String, String> call(String record) {
   String[] tokens = StringUtils.split(record, ";");
   String geneIDAndReferenceType = tokens[0];
   String patientIDAndGeneValue = tokens[1];
   String[] arr = StringUtils.split(geneIDAndReferenceType, ",");
   // arr[0] = geneID
   // arr[1] = referenceType   
   // check referenceType and geneValue
   String referenceType = (String) broadcastVarReferenceType.value();         
   if ( arr[1].equals(referenceType) ){ 
      // prepare key-value for reducer and send it to reducer
      return new Tuple2<String, String>(geneIDAndReferenceType, patientIDAndGeneValue); 
   }
   else {
    // otherwise nothing will be counted  
    // later we will filter out these "null" keys  
    return TUPLE_2_NULL; 
   }      
 }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
   public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
     int N = broadcastN.value();
     SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
     while (iter.hasNext()) {
      Tuple2<String,Integer> tuple = iter.next();
      String kmer = tuple._1;
      int frequency = tuple._2;
      topN.put(frequency, kmer);
      // keep only top N 
      if (topN.size() > N) {
        topN.remove(topN.firstKey());
      }  
    }
    System.out.println("topN="+topN);
    return Collections.singletonList(topN).iterator();
   }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
 public Tuple2<String,Tuple2<Double,String>> call(Tuple2<String,String> cartRecord) {
  String rRecord = cartRecord._1;
  String sRecord = cartRecord._2;
  String[] rTokens = rRecord.split(";"); 
  String rRecordID = rTokens[0];
  String r = rTokens[1]; //  r.1, r.2, ..., r.d
  String[] sTokens = sRecord.split(";"); 
  // sTokens[0] = s.recordID
  String sClassificationID = sTokens[1]; 
  String s = sTokens[2]; // s.1, s.2, ..., s.d
  Integer d = broadcastD.value();
  double distance = Util.calculateDistance(r, s, d);
  String K = rRecordID; //  r.recordID
  Tuple2<Double,String> V = new Tuple2<Double,String>(distance, sClassificationID);
  return new Tuple2<String,Tuple2<Double,String>>(K, V);
 }
});

代码示例来源:origin: mahmoudparsian/data-algorithms-book

Map<Tuple2<String,String>, Double> CLASSIFIER = broadcastClassifier.value();
List<String> CLASSES = broadcastClasses.value();

代码示例来源:origin: mahmoudparsian/data-algorithms-book

System.out.println("genes: filterValueThreshold="+filterValueThreshold);
String referenceType = (String) broadcastVarReferenceType.value();
String filterType = (String) broadcastVarFilterType.value();
Double filterValueThreshold = (Double) broadcastVarFilterValueThreshold.value();

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
  public Double call(Iterable<String> biosets) {
    Set<String> geneBiosets = new HashSet<String>();
    for (String biosetID : biosets) {
     geneBiosets.add(biosetID);
    }
        // now we do need shared Map data structure to iterate over its items
    Map<String, Double> timetable = broadcastTimeTable.value();
    // the following two lists are needed for ttest(exist, notexist)
    List<Double> exist = new ArrayList<Double>();  
    List<Double> notexist = new ArrayList<Double>();  
    for (Map.Entry<String, Double> entry : timetable.entrySet()) {
      String biosetID = entry.getKey();
      Double time = entry.getValue();
      if (geneBiosets.contains(biosetID)) {
       exist.add(time);
      }
      else {
       notexist.add(time);
      }
    }
        // perform the ttest(exist, notexist)
    double ttest = MathUtil.ttest(exist, notexist);
    return ttest;
  }
});

代码示例来源:origin: apache/tinkerpop

@Override
public <R> R get(final String key) throws IllegalArgumentException {
  if (!this.memoryComputeKeys.containsKey(key))
    throw Memory.Exceptions.memoryDoesNotExist(key);
  if (this.inExecute && !this.memoryComputeKeys.get(key).isBroadcast())
    throw Memory.Exceptions.memoryDoesNotExist(key);
  final ObjectWritable<R> r = (ObjectWritable<R>) (this.inExecute ? this.broadcast.value().get(key) : this.sparkMemory.get(key).value());
  if (null == r || r.isEmpty())
    throw Memory.Exceptions.memoryDoesNotExist(key);
  else
    return r.get();
}

代码示例来源:origin: mahmoudparsian/data-algorithms-book

@Override
 public String call(Iterable<Tuple2<Double,String>> neighbors) {
   Integer k = broadcastK.value();
   // keep only k-nearest-neighbors
   SortedMap<Double, String> nearestK = Util.findNearestK(neighbors, k);
   
   // now we have the k-nearest-neighbors in nearestK
   // we need to find out the classification by majority
   // count classifications
   Map<String, Integer> majority = Util.buildClassificationCount(nearestK);
    
   // find a classificationID with majority of vote
   String selectedClassification = Util.classifyByMajority(majority);
   return selectedClassification;
 }
});

相关文章