原来的代码:
JavaRDDjavaRdd = rdd.flatMap(new FlatMapFunction () { private static final long serialVersionUID = 10000L; List newList = new ArrayList (); public Iterable call(String line) throws Exception { String[] splits = line.split("\t"); ArticleReply bean = new ArticleReply(); bean.setAreaId(split[0]); bean.setAgent(Integer.parseInt(splits[1])); bean.setSerial(splits[2]); newList.add(bean); return newList; } });
正确写法:
JavaRDDjavaRdd = rdd.flatMap(new FlatMapFunction () {
private static final long serialVersionUID = 10000L; public Iterablecall(String line) throws Exception { List newList = new ArrayList (); String[] splits = line.split("\t"); ArticleReply bean = new ArticleReply(); bean.setAreaId(split[0]); bean.setAgent(Integer.parseInt(splits[1])); bean.setSerial(splits[2]); newList.add(bean); return newList; } });
错误的写法中把list声明和初始化在flatMap函数之外,造成每次调用flatMap函数后,list的bean会增加一个,同时程序会将改list返还回去,那么spark接收的对象1+2+3+...+N个,
而不是N个,会极大地消耗spark的内存,造成spark运行内存不足。