job 실행
ex)hadoop jar temp.jar
Job 구조
클라이언트(mr 잡을 실행하는 머쉰)에서 job을 시작하게 되면 소스에서 지정한 하둡설정 파일 혹은 직접 선언한 클러스터의 namenode에게 파일리스트, 위치를 요청하고 접근 허가를 얻는다. 각 데이터노드들에서 map작업이 시작되는데 이 작업은 Mapper 클래스의 map함수를 overide 구현한다. 이 과정에서 row단위로 읽어들여 텍스트를 key value 쌍으로 만들어준다. 이때 key value 쌍으로 만든다는 것은. id별 방문 횟수를 카운트 한다고 할 때, ['laesunk', 1] ['laesunk',1] 이런식으로 각 데이터 노드가 갖고 있는 raw데이터에서 라인별로 읽으며 id와 value를 단순 맵 형태로 만들어 줌을 이야기 한다. 이것은 맵 작업이 다 끝나게 되면 로컬로 파일로 쓰고 리듀서가 http 통신으로 가져간다. 이 사이에 셔플링, 컴바이닝 등을 통해 튜닝을 해볼 수 있다. 리듀서에서는 mapper로 부터 나오는 결과중 같은 key들을 합칩니다. 여러 mapper에서 온 데이터중 같은 key를 갖는 데이터가 있을테니 그것들을 합치면 key, iterable 의 형태로 만들게 된다. 그리고 각 input pair 마다 reduce 함수가 돌게 되는데 이때 value들은 iterable 한 형태이므로 추가 작업등을 더 추가 할 수도 있다.
JOBmain
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class testjob{ protected static Job job; protected static JobConf conf; private static String HDFS_PREFIX = "hadoophost" ; private static String MAP_TASKS_CONF_KEY = "mapred.map.tasks" ; private static String REDUCE_TASKS_CONF_KEY = "mapred.reduce.tasks" ; public static void main(String[] args) throws IOException { // TODO Auto-generated method stub conf = new JobConf(); conf.addResource( new Path( "mapred-site.xml" )); conf.addResource( new Path( "core-site.xml" )); conf.addResource( new Path( "hdfs-site.xml" )); //하둡 설정파일들 등록 여기서 하둡 데이터노드들 등의 환경을 읽어 온다. 버퍼 사이즈등등. 소스에서 세부적 옵션 셋업 코드 가능 conf.set(MAP_TASKS_CONF_KEY, "12" ); conf.set(REDUCE_TASKS_CONF_KEY, "3" ); conf.setJar(args[ 0 ]); // 파라미터로 jar 페키지 파일을 지정해준다. job = new Job(conf); job.setJobName( "jobname" ); job.setInputFormatClass(TextInputFormat. class ); job.setOutputFormatClass(TextOutputFormat. class ); job.setMapperClass(testMapper. class ); //맵퍼 클래스 아래 소스의 클래스 지정 job.setReducerClass(testReducer. class ); job.setOutputKeyClass(Text. class ); // 맵퍼에서 나갈때 키값 타입 job.setOutputValueClass(IntWritable. class ); // 맵퍼에서 나갈때 벨류 타입 try { FileInputFormat.addInputPath(job, new Path( "input path " )); // 이부분에 하둡에 들어있는 데이터 위치를 지정해준다. FileOutputFormat.setOutputPath(job, new Path( "ouputpath" )); //이부분에는 맵리듀스 잡으로 부터 나오는 결과 파일의 위치를 지정해준다. 이역시 하둡 상에 위치 } catch (IOException e){ e.printStackTrace(); } try { job.waitForCompletion( true ); Thread.sleep( 10000 ); } catch (ClassNotFoundException e){ // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e){ // TODO Auto-generated catch block e.printStackTrace(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class testMapper extends Mapper{ private Text keyVar = new Text(); private IntWritable valueVar = new IntWritable(); private String kvDelimeter = "\"; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String data = value.toString(); try { String[] datas = data.split(kvDelimeter); String name=datas[ 0 ]; String cnt = datas[ 1 ]; keyVar.set(userId); valueVar.set(Integer.parseInt(count)); context.write(keyVar,valueVar); } catch (Exception e){ e.printStackTrace(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class testReducer extends Reducer { private Text keyVar = new Text(); private IntWritable valueVar = new IntWritable(); protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count= 0 ; for (IntWritable value: values) { count+=value.get(); } keyVar.set(key); valueVar.set(count); context.write(keyVar,valueVar); } } |
'프로그래밍 > Hadoop ETC' 카테고리의 다른 글
[Hadoop] hdfs에 데이터 올라가는 과정 (0) | 2015.08.21 |
---|---|
[Hadoop] OOZIE 사용 (with hive, sqoop) (0) | 2015.06.17 |
[Hadoop] hive 파티션 설정, external table, datatype (0) | 2015.06.13 |
hive 조회 결과 파일로 떨구기 (0) | 2015.06.13 |
MR job log 파일 뽑기 (0) | 2015.03.26 |