job 실행
ex)hadoop jar temp.jar
Job 구조
클라이언트(mr 잡을 실행하는 머쉰)에서 job을 시작하게 되면 소스에서 지정한 하둡설정 파일 혹은 직접 선언한 클러스터의 namenode에게 파일리스트, 위치를 요청하고 접근 허가를 얻는다. 각 데이터노드들에서 map작업이 시작되는데 이 작업은 Mapper 클래스의 map함수를 overide 구현한다. 이 과정에서 row단위로 읽어들여 텍스트를 key value 쌍으로 만들어준다. 이때 key value 쌍으로 만든다는 것은. id별 방문 횟수를 카운트 한다고 할 때, ['laesunk', 1] ['laesunk',1] 이런식으로 각 데이터 노드가 갖고 있는 raw데이터에서 라인별로 읽으며 id와 value를 단순 맵 형태로 만들어 줌을 이야기 한다. 이것은 맵 작업이 다 끝나게 되면 로컬로 파일로 쓰고 리듀서가 http 통신으로 가져간다. 이 사이에 셔플링, 컴바이닝 등을 통해 튜닝을 해볼 수 있다. 리듀서에서는 mapper로 부터 나오는 결과중 같은 key들을 합칩니다. 여러 mapper에서 온 데이터중 같은 key를 갖는 데이터가 있을테니 그것들을 합치면 key, iterable 의 형태로 만들게 된다. 그리고 각 input pair 마다 reduce 함수가 돌게 되는데 이때 value들은 iterable 한 형태이므로 추가 작업등을 더 추가 할 수도 있다.
JOBmain
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class testjob{ protected static Job job; protected static JobConf conf; private static String HDFS_PREFIX = "hadoophost"; private static String MAP_TASKS_CONF_KEY = "mapred.map.tasks"; private static String REDUCE_TASKS_CONF_KEY = "mapred.reduce.tasks"; public static void main(String[] args) throws IOException { // TODO Auto-generated method stub conf = new JobConf(); conf.addResource(new Path("mapred-site.xml")); conf.addResource(new Path("core-site.xml")); conf.addResource(new Path("hdfs-site.xml")); //하둡 설정파일들 등록 여기서 하둡 데이터노드들 등의 환경을 읽어 온다. 버퍼 사이즈등등. 소스에서 세부적 옵션 셋업 코드 가능 conf.set(MAP_TASKS_CONF_KEY, "12"); conf.set(REDUCE_TASKS_CONF_KEY, "3"); conf.setJar(args[0]); // 파라미터로 jar 페키지 파일을 지정해준다. job = new Job(conf); job.setJobName("jobname"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(testMapper.class); //맵퍼 클래스 아래 소스의 클래스 지정 job.setReducerClass(testReducer.class); job.setOutputKeyClass(Text.class); // 맵퍼에서 나갈때 키값 타입 job.setOutputValueClass(IntWritable.class); // 맵퍼에서 나갈때 벨류 타입 try{ FileInputFormat.addInputPath(job,new Path("input path ")); // 이부분에 하둡에 들어있는 데이터 위치를 지정해준다. FileOutputFormat.setOutputPath(job, new Path("ouputpath")); //이부분에는 맵리듀스 잡으로 부터 나오는 결과 파일의 위치를 지정해준다. 이역시 하둡 상에 위치 }catch(IOException e){ e.printStackTrace(); } try { job.waitForCompletion(true); Thread.sleep(10000); } catch (ClassNotFoundException e){ // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e){ // TODO Auto-generated catch block e.printStackTrace(); } } }하둡에서는 int ,String의 타입을 쓰지않고 그것을 자체구현한 writable 클래스를 상속한 intwritable 따위의 객체로 쓴다. 직렬화를 위해 serializable을 구현해서 상속받은 것이다.
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class testMapper extends Mapper{ private Text keyVar = new Text(); private IntWritable valueVar = new IntWritable(); private String kvDelimeter = "\"; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String data = value.toString(); try{ String[] datas = data.split(kvDelimeter); String name=datas[0]; String cnt = datas[1]; keyVar.set(userId); valueVar.set(Integer.parseInt(count)); context.write(keyVar,valueVar); }catch(Exception e){ e.printStackTrace(); } } }맵에서 나온 데이터들은 각 키별로 소팅되어 reducer로 입력되어진다. 아래는 키값과 벨류리스트를 들어오면서 키에 벨류들의 총 합을 구하는 내용이다.
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class testReducer extends Reducer { private Text keyVar = new Text(); private IntWritable valueVar = new IntWritable(); protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable value: values) { count+=value.get(); } keyVar.set(key); valueVar.set(count); context.write(keyVar,valueVar); } }
'프로그래밍 > Hadoop ETC' 카테고리의 다른 글
[Hadoop] hdfs에 데이터 올라가는 과정 (0) | 2015.08.21 |
---|---|
[Hadoop] OOZIE 사용 (with hive, sqoop) (0) | 2015.06.17 |
[Hadoop] hive 파티션 설정, external table, datatype (0) | 2015.06.13 |
hive 조회 결과 파일로 떨구기 (0) | 2015.06.13 |
MR job log 파일 뽑기 (0) | 2015.03.26 |