'MapReduce'에 해당되는 글 1건

  1. 2015.03.28 MR(mapreduce) with java
프로그래밍/Hadoop ETC 2015. 3. 28. 11:53
하둡에서 데이터를 추출하는 가장 기본적인? 수동적인 방법인 mapreduce 를 java 로 구현한다. 

하둡의 기본 설치 및 configuration은 생략 맵과 리듀스 구조 하둡 기본 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html // 

fs 파일 io 와 관련된 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CommandsManual.html ex) hadoop fs -mkdir '/tmp/temp' // 

job 실행 

ex)hadoop jar temp.jar

Job 구조 

 클라이언트(mr 잡을 실행하는 머쉰)에서 job을 시작하게 되면 소스에서 지정한 하둡설정 파일 혹은 직접 선언한 클러스터의 namenode에게 파일리스트, 위치를 요청하고 접근 허가를 얻는다. 각 데이터노드들에서 map작업이 시작되는데 이 작업은 Mapper 클래스의 map함수를 overide 구현한다. 이 과정에서 row단위로 읽어들여 텍스트를 key value 쌍으로 만들어준다. 이때 key value 쌍으로 만든다는 것은. id별 방문 횟수를 카운트 한다고 할 때, ['laesunk', 1] ['laesunk',1] 이런식으로 각 데이터 노드가 갖고 있는 raw데이터에서 라인별로 읽으며 id와 value를 단순 맵 형태로 만들어 줌을 이야기 한다. 이것은 맵 작업이 다 끝나게 되면 로컬로 파일로 쓰고 리듀서가 http 통신으로 가져간다. 이 사이에 셔플링, 컴바이닝 등을 통해 튜닝을 해볼 수 있다. 리듀서에서는 mapper로 부터 나오는 결과중 같은 key들을 합칩니다. 여러 mapper에서 온 데이터중 같은 key를 갖는 데이터가 있을테니 그것들을 합치면 key, iterable 의 형태로 만들게 된다. 그리고 각 input pair 마다 reduce 함수가 돌게 되는데 이때 value들은 iterable 한 형태이므로 추가 작업등을 더 추가 할 수도 있다. 

 JOBmain

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class testjob{
	protected static Job job;
	protected static JobConf conf;
	
	private static String HDFS_PREFIX = "hadoophost";
	private static String MAP_TASKS_CONF_KEY = "mapred.map.tasks";
	private static String REDUCE_TASKS_CONF_KEY = "mapred.reduce.tasks";


	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		conf = new JobConf();
		conf.addResource(new Path("mapred-site.xml"));
		conf.addResource(new Path("core-site.xml"));
		conf.addResource(new Path("hdfs-site.xml"));   //하둡 설정파일들 등록   여기서 하둡 데이터노드들 등의 환경을 읽어 온다.  버퍼 사이즈등등.  소스에서 세부적 옵션 셋업 코드 가능

		conf.set(MAP_TASKS_CONF_KEY, "12");     
		conf.set(REDUCE_TASKS_CONF_KEY, "3");			

		conf.setJar(args[0]);  //  파라미터로 jar 페키지 파일을 지정해준다.
	
		job = new Job(conf);  
		job.setJobName("jobname");
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(testMapper.class);            //맵퍼 클래스 아래 소스의 클래스  지정
		job.setReducerClass(testReducer.class);
		job.setOutputKeyClass(Text.class);    //   맵퍼에서 나갈때 키값 타입 
		job.setOutputValueClass(IntWritable.class);    // 맵퍼에서 나갈때 벨류 타입 
		
		try{		
			FileInputFormat.addInputPath(job,new Path("input path "));  // 이부분에 하둡에 들어있는 데이터 위치를 지정해준다. 
			FileOutputFormat.setOutputPath(job, new Path("ouputpath")); //이부분에는 맵리듀스 잡으로 부터 나오는 결과 파일의 위치를 지정해준다. 이역시 하둡 상에 위치
		}catch(IOException e){
			e.printStackTrace();
		}

		try {
			job.waitForCompletion(true);
			Thread.sleep(10000);
		} catch (ClassNotFoundException e){
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e){
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
하둡에서는 int ,String의 타입을 쓰지않고 그것을 자체구현한 writable 클래스를 상속한 intwritable 따위의 객체로 쓴다. 직렬화를 위해 serializable을 구현해서 상속받은 것이다. 

 mapper source
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class testMapper extends Mapper{
	private Text keyVar = new Text();
	private IntWritable valueVar = new IntWritable();
	private String kvDelimeter = "\";
	
	
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
		String data = value.toString();		
		try{
	                String[] datas = data.split(kvDelimeter);
			String name=datas[0];
			String cnt = datas[1];
	
			keyVar.set(userId);
			valueVar.set(Integer.parseInt(count));
			context.write(keyVar,valueVar);

		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
맵에서 나온 데이터들은 각 키별로 소팅되어 reducer로 입력되어진다. 아래는 키값과 벨류리스트를 들어오면서 키에 벨류들의 총 합을 구하는 내용이다. 
 reducer source
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;

public class testReducer extends Reducer {

	private Text keyVar = new Text();

	private IntWritable valueVar = new IntWritable();

	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException 


	{ 

	int count=0;

		for(IntWritable value: values)

		{ 

			count+=value.get(); 

		}

		keyVar.set(key); valueVar.set(count); 

		context.write(keyVar,valueVar); 

	}

}


//