您好,歡迎來到網暖!
?
當前位置:網暖 » 站長資訊 » 建站基礎 » 網絡技術 » 文章詳細 訂閱RssFeed

二、MapReduce基本編程規范

來源:網絡整理 瀏覽:319次 時間:2020-02-24

[TOC]

一、MapReduce編程基本組成

編寫MapReduce的程序有至少三個必不可少的部分:mapper,reducer,driver。可選的有 partitioner,combiner
而且mapper的輸入輸出、reducer的輸入輸出都是key value型的,所以要求我們在編寫mapper和reducer時,必須實現明確這4個鍵值對中的8種數據類型,而且必須還是hadoop的可序列化類型。同時還需要注意的是,map的輸出其實就是reduce的輸入,所以包括的數據類型是一樣的。

1、map階段

編寫基本流程
1)自定義map類,需要繼承 Mapper這個類
2)繼承Mapper 的時候,需要指定輸入和輸出的鍵值對中的類型
3)必須重寫繼承自父類的map() 方法
4)上面重寫的map() 方法是每個map task對每一個輸入到mapper中的鍵值對都會調用處理一次。

基本編寫實例如下:

/*指定Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個類型分別為:LongWritable, Text, Text, IntWritable,相當于普通類型:long,string,string,int*/public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        這里是map方法 處理邏輯    }}
2、reduce階段

基本編寫流程
1)自定義reduce類,需要繼承 Reducer這個類
2)繼承Reducer的時候,需要指定輸入和輸出的鍵值對中的類型
3)必須重寫繼承自父類的reduce() 方法
4)上面重寫的reduce() 方法是每個reduer task對每一個輸入到reducer中的鍵值對都會調用處理一次。

基本編寫實例如下:

/*指定Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個類型分別為:Text, IntWritable, Text, IntWritable,相當于普通類型:string,int,string,int*/public class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {    protected void reduce(Text key,                          Iterable<IntWritable> values,                          Context context) throws IOException, InterruptedException {        這里是reduce方法 處理邏輯    }}
3、driver階段

這個部分是用于配置job對象的各種必須配置信息,配置完成后,將job提交給yarn執行
具體配置啥下面直接上例子看好了。主要起到調度map和reduce任務執行的作用

4、partitioner階段

這個階段主要是對map階段的輸出進行分區,而map的分區數直接決定reduce task的數量(一般來說是一對一),編寫流程如下:
1)自定義分區類,繼承 Partitioner<key, value>
2)繼承Partitioner的時候,處理的輸入的鍵值對類型
3)必須重寫繼承自父類的getPartition() 方法
4)上面重寫的getPartition() () 方法是每個maptask對每一個輸入的鍵值對都會調用處理一次。
5)根據分區規則,返回0~n,表示分區格式為0~n

編寫案例如下:

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {    @Override    public int getPartition(Text text, IntWritable intWritable, int i) {        判斷條件1:        return 0;        判斷條件2:        return 1;        .......        return n;    }}
5、combiner

combiner不是一個獨立的階段,它其實是包含在map階段中的。map本身輸出的鍵值對中,每個鍵值對的value都是1,就算是一樣的key,也是獨立一個鍵值對。如果重復的鍵值對越多,那么將map輸出傳遞到reduce的過程中,就會占用很多帶寬資源。優化的方法就是每個map輸出時,先在當前map task下進行局部合并匯總,減少重復可以的出現。即

<king,1> <>king,1>  這種一樣的key的,就會合并成 <king,2>這樣就會減少傳輸的數據量

所以其實由此可以知道,其實combiner的操作和reduce的操作是一樣的,只不過一個是局部,一個是全局。簡單的做法就是,直接將reducer作為combiner類傳入job,如:

job.setCombinerClass(WordCountReducer.class);

我們可以看看這個方法的源碼:

public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {        this.ensureState(Job.JobState.DEFINE);        //看到沒,那個  Reducer.class        this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);    }

可以清楚看到設置combine class時,可以看到多態的類型設置就是 Reducer 類型的,從這里也可以更加確定 combiner 的操作和 reducer的就是一樣的。

二、wordcount編程實例

下面開始用wordcount作為例子編寫一個完整的MapReduce程序

1、mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    //setup 和 clean 方法不是必須的    @Override    protected void setup(Context context) throws IOException, InterruptedException {        //最先執行        //System.out.println("this is setup");    }    @Override    protected void cleanup(Context context) throws IOException, InterruptedException {        //執行完map之后執行        //System.out.println("this is cleanup");    }    //這里創建一個臨時對象,用于保存中間值    Text k = new Text();    IntWritable v = new IntWritable();    /**     *     *     * @param key     * @param value     * @param context  用于連接map和reduce上下文,通過這個對象傳遞map的結果給reduce     * @throws IOException     * @throws InterruptedException     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //System.out.println("開始map=====================");        //1.value是讀取到的一行字符串,要將其轉換為java中的string進行處理,即反序列化        String line = value.toString();        //2.切分數據        String[] words = line.split(" ");        //3.輸出map結構, <單詞,個數>的形式,寫入的時候需將普通類型轉為序列化類型        /**         * 兩種寫法:         * 1) context.write(new Text(word), new IntWritable(1));         *     缺點:每次都會創建兩個對象,最后會造成創建了很多臨時對象         *         * 2)Text k = new Text();         *    IntWritable v = new IntWritable();         *         *    for {         *       k.set(word);         *       v.set(1);         *       context.write(k, v);         *    }         *         *    這種方法好處就是,對象只創建了一次,后續只是通過修改對象內部的值的方式傳遞,無需重復創建多個對象         */        for (String word:words) {            //轉換普通類型為可序列化類型            k.set(word);            v.set(1);            //寫入到上下文對象中            context.write(k, v);        }    }}
2、reducer
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {    /**     * 這里的 Iterable<IntWritable> values 之所以是一個可迭代的對象,     * 是因為從map傳遞過來的數據經過合并了,如:     * (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 這樣的形式,所以value可以通過迭代方式獲取其中的值     *     */    IntWritable counts = new IntWritable();    @Override    protected void reduce(Text key,                          Iterable<IntWritable> values,                          Context context) throws IOException, InterruptedException {        //1.初始化次數        int count = 0;        //2.匯總同一個key中的個數        for (IntWritable value: values) {            count += value.get();        }        //3.輸出reduce        counts.set(count);        context.write(key, counts);    }}
3、driver
public class WordCountDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        //這里只是方便在ide下直接運行,如果是在命令行下直接輸入輸入和輸出文件路徑即可        args = new String[]{"G:\\test2\\", "G:\\testmap6\\"};        //1.獲取配置對象        Configuration conf = new Configuration();        //2.獲取job對象        Job job = Job.getInstance(conf);        //3.分別給job指定driver,map,reducer的類        job.setJarByClass(WordCountDriver.class);        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordCountReducer.class);        //4.分別指定map和reduce階段輸出的類型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);         //這里可以設置分區類,需要額外編寫分區實現類//        job.setPartitionerClass(WordCountPartitioner.class);//        job.setNumReduceTasks(2);        //設置預合并類        //job.setCombinerClass(WordCountReducer.class);        //設置inputFormat類,大量小文件優化,不設置默認使用 TextInputFormat        job.setInputFormatClass(CombineTextInputFormat.class);        CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024);        CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);        //5.數據輸入來源以及結果的輸出位置        // 輸入的時候會根據數據源的情況自動map切片,形成切片信息(或者叫切片方案)        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        //以上就是將一個job的配置信息配置完成后,下面就提交job,hadoop將跟就job的配置執行job        //6.提交job任務,這個方法相當于 job.submit()之后,然后等待執行完成        //任務配置信息是提交至yarn的  MRappmanager        job.waitForCompletion(true);    }}

推薦站點

  • 騰訊騰訊

    騰訊網(www.QQ.com)是中國瀏覽量最大的中文門戶網站,是騰訊公司推出的集新聞信息、互動社區、娛樂產品和基礎服務為一體的大型綜合門戶網站。騰訊網服務于全球華人用戶,致力成為最具傳播力和互動性,權威、主流、時尚的互聯網媒體平臺。通過強大的實時新聞和全面深入的信息資訊服務,為中國數以億計的互聯網用戶提供富有創意的網上新生活。

    www.qq.com
  • 搜狐搜狐

    搜狐網是全球最大的中文門戶網站,為用戶提供24小時不間斷的最新資訊,及搜索、郵件等網絡服務。內容包括全球熱點事件、突發新聞、時事評論、熱播影視劇、體育賽事、行業動態、生活服務信息,以及論壇、博客、微博、我的搜狐等互動空間。

    www.sohu.com
  • 網易網易

    網易是中國領先的互聯網技術公司,為用戶提供免費郵箱、游戲、搜索引擎服務,開設新聞、娛樂、體育等30多個內容頻道,及博客、視頻、論壇等互動交流,網聚人的力量。

    www.163.com
  • 新浪新浪

    新浪網為全球用戶24小時提供全面及時的中文資訊,內容覆蓋國內外突發新聞事件、體壇賽事、娛樂時尚、產業資訊、實用信息等,設有新聞、體育、娛樂、財經、科技、房產、汽車等30多個內容頻道,同時開設博客、視頻、論壇等自由互動交流空間。

    www.sina.com.cn
  • 百度一下百度一下

    百度一下,你就知道

    www.baidu.com
?
陕西11选5走势图前3直