프로그래밍/Hadoop ETC 2015. 3. 28. 11:53
하둡에서 데이터를 추출하는 가장 기본적인? 수동적인 방법인 mapreduce 를 java 로 구현한다. 

하둡의 기본 설치 및 configuration은 생략 맵과 리듀스 구조 하둡 기본 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html // 

fs 파일 io 와 관련된 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CommandsManual.html ex) hadoop fs -mkdir '/tmp/temp' // 

job 실행 

ex)hadoop jar temp.jar

Job 구조 

 클라이언트(mr 잡을 실행하는 머쉰)에서 job을 시작하게 되면 소스에서 지정한 하둡설정 파일 혹은 직접 선언한 클러스터의 namenode에게 파일리스트, 위치를 요청하고 접근 허가를 얻는다. 각 데이터노드들에서 map작업이 시작되는데 이 작업은 Mapper 클래스의 map함수를 overide 구현한다. 이 과정에서 row단위로 읽어들여 텍스트를 key value 쌍으로 만들어준다. 이때 key value 쌍으로 만든다는 것은. id별 방문 횟수를 카운트 한다고 할 때, ['laesunk', 1] ['laesunk',1] 이런식으로 각 데이터 노드가 갖고 있는 raw데이터에서 라인별로 읽으며 id와 value를 단순 맵 형태로 만들어 줌을 이야기 한다. 이것은 맵 작업이 다 끝나게 되면 로컬로 파일로 쓰고 리듀서가 http 통신으로 가져간다. 이 사이에 셔플링, 컴바이닝 등을 통해 튜닝을 해볼 수 있다. 리듀서에서는 mapper로 부터 나오는 결과중 같은 key들을 합칩니다. 여러 mapper에서 온 데이터중 같은 key를 갖는 데이터가 있을테니 그것들을 합치면 key, iterable 의 형태로 만들게 된다. 그리고 각 input pair 마다 reduce 함수가 돌게 되는데 이때 value들은 iterable 한 형태이므로 추가 작업등을 더 추가 할 수도 있다. 

 JOBmain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class testjob{
    protected static Job job;
    protected static JobConf conf;
     
    private static String HDFS_PREFIX = "hadoophost";
    private static String MAP_TASKS_CONF_KEY = "mapred.map.tasks";
    private static String REDUCE_TASKS_CONF_KEY = "mapred.reduce.tasks";
 
 
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        conf = new JobConf();
        conf.addResource(new Path("mapred-site.xml"));
        conf.addResource(new Path("core-site.xml"));
        conf.addResource(new Path("hdfs-site.xml"));   //하둡 설정파일들 등록   여기서 하둡 데이터노드들 등의 환경을 읽어 온다.  버퍼 사이즈등등.  소스에서 세부적 옵션 셋업 코드 가능
 
        conf.set(MAP_TASKS_CONF_KEY, "12");    
        conf.set(REDUCE_TASKS_CONF_KEY, "3");          
 
        conf.setJar(args[0]);  //  파라미터로 jar 페키지 파일을 지정해준다.
     
        job = new Job(conf); 
        job.setJobName("jobname");
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapperClass(testMapper.class);            //맵퍼 클래스 아래 소스의 클래스  지정
        job.setReducerClass(testReducer.class);
        job.setOutputKeyClass(Text.class);    //   맵퍼에서 나갈때 키값 타입
        job.setOutputValueClass(IntWritable.class);    // 맵퍼에서 나갈때 벨류 타입
         
        try{       
            FileInputFormat.addInputPath(job,new Path("input path "));  // 이부분에 하둡에 들어있는 데이터 위치를 지정해준다.
            FileOutputFormat.setOutputPath(job, new Path("ouputpath")); //이부분에는 맵리듀스 잡으로 부터 나오는 결과 파일의 위치를 지정해준다. 이역시 하둡 상에 위치
        }catch(IOException e){
            e.printStackTrace();
        }
 
        try {
            job.waitForCompletion(true);
            Thread.sleep(10000);
        } catch (ClassNotFoundException e){
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e){
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
하둡에서는 int ,String의 타입을 쓰지않고 그것을 자체구현한 writable 클래스를 상속한 intwritable 따위의 객체로 쓴다. 직렬화를 위해 serializable을 구현해서 상속받은 것이다. 

 mapper source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class testMapper extends Mapper{
    private Text keyVar = new Text();
    private IntWritable valueVar = new IntWritable();
    private String kvDelimeter = "\";
     
     
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String data = value.toString();    
        try{
                    String[] datas = data.split(kvDelimeter);
            String name=datas[0];
            String cnt = datas[1];
     
            keyVar.set(userId);
            valueVar.set(Integer.parseInt(count));
            context.write(keyVar,valueVar);
 
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}
맵에서 나온 데이터들은 각 키별로 소팅되어 reducer로 입력되어진다. 아래는 키값과 벨류리스트를 들어오면서 키에 벨류들의 총 합을 구하는 내용이다. 
 reducer source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.IOException;
 
import org.apache.hadoop.io.IntWritable;
 
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.mapreduce.Reducer;
 
import org.apache.hadoop.mapreduce.Reducer.Context;
 
public class testReducer extends Reducer {
 
    private Text keyVar = new Text();
 
    private IntWritable valueVar = new IntWritable();
 
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException
 
 
    {
 
    int count=0;
 
        for(IntWritable value: values)
 
        {
 
            count+=value.get();
 
        }
 
        keyVar.set(key); valueVar.set(count);
 
        context.write(keyVar,valueVar);
 
    }
 
}


//