hadoop作map/reduce时简单尝试

这是我参与更文挑战的第23天,活动详情查看: 更文挑战
?

在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.

用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。

package redpoll.examples;  
  
import org.apache.hadoop.io.GenericWritable;  
import org.apache.hadoop.io.Writable;  
  
/** 
 * Generic Writable class for terms. 
 * @author
 */  
public class TermWritable extends GenericWritable {  
  private static Class<? extends Writable>[] CLASSES = null;  
  
  static {  
    CLASSES = (Class<? extends Writable>[]) new Class[] {  
        org.apache.hadoop.io.ArrayWritable.class,  
        org.apache.hadoop.io.IntWritable.class,  
        redpoll.examples.TFWritable.class  
        };  
  }  
  
  public TermWritable() {  
  }  
  
  public TermWritable(Writable instance) {  
    set(instance);  
  }  
  
  @Override  
  protected Class<? extends Writable>[] getTypes() {  
    return CLASSES;  
  }  
}  
复制代码

Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。

package redpoll.examples;  
  
import java.io.IOException;  
import java.io.StringReader;  
  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.Mapper;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.lucene.analysis.Analyzer;  
import org.apache.lucene.analysis.Token;  
import org.apache.lucene.analysis.TokenStream;  
import org.apache.lucene.analysis.standard.StandardAnalyzer;  
  
/** 
 * A class provides for doing words segmenation and counting term TFs and DFs.<p> 
 * in: key is document id, value is a document instance. <br> 
 * output: 
 * <li>key is term, value is a <documentId, tf> pair</li> 
 * <li>key is term, value is document frequency corresponsing to the key</li> 
 * @author 
 */  
public class TermMapper extends MapReduceBase implements  
    Mapper<LongWritable, Document, Text, TermWritable> {  
  private static final Log log = LogFactory.getLog(TermMapper.class  
      .getName());  
    
  /* analyzer for words segmentation */  
  private Analyzer analyzer = null;  
     
  /* frequency weight for document title */  
  private IntWritable titleWeight = new IntWritable(2);  
  /* frequency weight for document content */  
  private IntWritable contentWeight = new IntWritable(1);  
  
    
  public void map(LongWritable key, Document value,  
      OutputCollector<Text, TermWritable> output, Reporter reporter)  
      throws IOException {  
    doMap(key, value.getTitle(), titleWeight, output, reporter);  
    doMap(key, value.getContent(), contentWeight, output, reporter);  
  }  
    
  private void doMap(LongWritable key, String value, IntWritable weight,  
      OutputCollector<Text, TermWritable> output, Reporter reporter)  
      throws IOException {  
    // do words segmentation  
    TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value));  
    Token token = new Token();  
    while ((token = ts.next(token)) != null) {  
      String termString = new String(token.termBuffer(), 0, token.termLength());  
      Text term = new Text(termString);  
      // <term, <documentId,tf>>  
      TFWritable tf = new TFWritable(key, weight);  
      output.collect(term, new TermWritable(tf)); // wrap then collect  
      // <term, weight>  
      output.collect(term, new TermWritable(weight)); // wrap then collect  
    }  
  }  
      
  @Override  
  public void configure(JobConf job) {  
    String analyzerName = job.get("redpoll.text.analyzer");  
    try {  
      if (analyzerName != null)  
        analyzer = (Analyzer) Class.forName(analyzerName).newInstance();  
    } catch (Exception excp) {  
      excp.printStackTrace();  
    }  
    if (analyzer == null)  
      analyzer = new StandardAnalyzer();  
  }  
  
}  
复制代码

Reduce如果想获取数据,则可以解包(unwrap)它:

package redpoll.examples;  
  
import java.io.IOException;  
import java.util.ArrayList;  
import java.util.Iterator;  
  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.apache.hadoop.io.ArrayWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.Reducer;  
import org.apache.hadoop.mapred.Reporter;  
  
/** 
 * Form a tf vector and caculate the df for terms. 
 * @author 
 */  
public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> {  
    
  private static final Log log = LogFactory.getLog(TermReducer.class.getName());  
    
  public void reduce(Text key, Iterator<TermWritable> values,  
      OutputCollector<Text, Writable> output, Reporter reporter)  
      throws IOException {  
    ArrayList<TFWritable> tfs = new ArrayList<TFWritable>();  
    int sum = 0;  
//    log.info("term:" + key.toString());  
    while (values.hasNext()) {  
      Writable value = values.next().get(); // unwrap  
      if (value  instanceof TFWritable) {  
        tfs.add((TFWritable) value );   
      }else {  
        sum += ((IntWritable) value).get();  
      }  
    }  
      
    TFWritable writables[] = new TFWritable[tfs.size()];  
    ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables));  
    // wrap again  
    output.collect(key, new TermWritable(aw));   
    output.collect(key, new TermWritable(new IntWritable(sum)));  
  }  
  
}  
复制代码

这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。


真心感谢帅逼靓女们能看到这里,如果这个文章写得还不错,觉得有点东西的话

求点赞? 求关注❤️ 求分享? 对8块腹肌的我来说真的 非常有用!!!

如果本篇博客有任何错误,请批评指教,不胜感激 !❤️❤️❤️❤️

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享