Spark Streaming 中, 如果连续使用两个 reduceByKeyAndWindow, window 和 slide 的设置在一些情况下, 任务会被正常调度, 但是数据缺失.
测试代码:
public class TwoReduceWindow {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("TwoReduceWindow");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// generate rdds
System.out.println("Generate rdds ...");
Queue<JavaRDD<Integer>> queue = new LinkedList<JavaRDD<Integer>>();
Random rand = new Random();
for (int i = 0; i < 1000; i++) {
List<Integer> list = Lists.newArrayList();
for (int j = 0; j < 1000; j++) {
list.add(rand.nextInt(100));
}
JavaRDD<Integer> rdd = jssc.sparkContext().parallelize(list);
queue.add(rdd);
}
// generate streaming
System.out.println("Generate streaming...");
JavaDStream<Integer> stream = jssc.queueStream(queue);
JavaPairDStream<Integer, Integer> pairStream = stream.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer n) {
return new Tuple2<Integer, Integer>(n, n % 10);
}
});
int window01 = 2000;
int slide01 = 1000;
int window02 = 4000;
int slide02 = 2000;
JavaPairDStream<Integer, Integer> rdStream = pairStream.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer m, Integer n) throws Exception {
return m + n;
}
},
new Duration(window01),
new Duration(slide01)
);
JavaPairDStream<Integer, Integer> moreRdStream = rdStream.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer m, Integer n) throws Exception {
return m + n;
}
},
new Duration(window02),
new Duration(slide02)
);
moreRdStream.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() {
@Override
public Void call(JavaPairRDD<Integer, Integer> rdd) throws Exception {
rdd.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tuple) throws Exception {
System.out.println("tuple: " + tuple.toString());
}
});
return null;
}
});
jssc.start();
jssc.awaitTermination();
}
}
下面是一组测试:
| window01 | slide01 | window02 | slide02 | 结果 |
| 2000 | 1000 | 4000 | 2000 | 正常输出 |
| 2000 | 2000 | 4000 | 2000 | 数据缺失 |
| 2000 | 2000 | 4000 | 4000 | 数据缺失 |
| 2000 | 2000 | 8000 | 4000 | 正常输出 |
Spark-1.1 和 Spark-1.2 都存在这个问题, Spark-1.3 中都可以正常输出.
估计是前两个版本的一个 bug.