프로그래밍/Hadoop ETC 2017. 3. 22. 09:06

1) Submit job:
$ oozie job -oozie htp://localhost:11000/oozie -config oozieProject/workflowHdfsAndEmailActions/job.properties -submit job: 0000001-130712212133144-oozie-oozi-W
 
2) Run job:
$ oozie job -oozie htp://localhost:11000/oozie -start 0000001-130712212133144-oozie-oozi-W
 
3) Check the status:
$ oozie job -oozie htp://localhost:11000/oozie -info 0000001-130712212133144-oozie-oozi-W
 
4) Suspend workflow:
$ oozie job -oozie htp://localhost:11000/oozie -suspend 0000001-130712212133144-oozie-oozi-W
 
5) Resume workflow:
$ oozie job -oozie htp://localhost:11000/oozie -resume 0000001-130712212133144-oozie-oozi-W
 
6) Re-run workflow:
$ oozie job -oozie htp://localhost:11000/oozie -config oozieProject/workflowHdfsAndEmailActions/job.properties -rerun 0000001-130712212133144-oozie-oozi-W
 
7) Should you need to kill the job:
$ oozie job -oozie htp://localhost:11000/oozie -kill 0000001-130712212133144-oozie-oozi-W
 
8) View server logs:
$ oozie job -oozie htp://localhost:11000/oozie -logs 0000001-130712212133144-oozie-oozi-W
 
Logs are available at:
/var/log/oozie on the Oozie server.

//
프로그래밍/Hadoop ETC 2016. 5. 10. 14:17
hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt


//
프로그래밍/Hadoop ETC 2015. 12. 8. 21:46
특정 조건 하에 랭킹 x 순위 까지 구해야 할 경우가 있다. 예를 들면 일별 접근 횟수 랭킹 10 위까지의 ip를 구하는 경우? 아래와 같이 할 수 있을 것이다. (물론 자신의 사이트(서비스)에 접근할 때마다 로깅을 하여 데이터는 있다는 가정)
select t2.day, t2.ip, t2.cnt  from 
(
   select t1.day, t1.ip, t1.cnt, rank() over (PARTITION BY dd order by cnt desc) as rank from 
   (
      select day, ip, count(ip) as cnt from log where month = '09' group by dd, userip
   ) t1
) t2 where rank < 10;
//
프로그래밍/Hadoop ETC 2015. 8. 21. 23:28

하둡 파일 시스템에 파일을 저장할 때 내부 구조에 대해서 알아보고자 한다.

하둡 api들(put) 같은 명령어를 사용해서 파일을 hdfs상에 올리게 되는데 그 과정에 대해서 "시작하세요 하둡 프로그래밍" (저자:정재화) 책의 내용과 https://github.com/apache/hadoop 의 소스를  참고하여 정리한다.

기본적인 흐름 :

클라이언트의 요청 -> 네임노드와 통신 -> 권한 수락 -> 파일 저장 스트림 생성 -> 파일전송 -> 스트림 닫기

위와 같이 이루어집니다.

하둡 파일 시스템은 실제 파일들의 상태와 달리 일반 파일시스템과 같은 모양으로 다루기 위해 추상화를 시켜놓았다.

실제 하둡 소스에서 보면 FileSystem 이라는 클래스로 추상화되어져있다. 그리고 이 클래스를 상속하여 

DistributedFileSystem클래스를 정의한다. 이 클래스의 create 메소드를 이용하여 입력 스트림을 만든다. 이 입력 스트림 역시 FSDataOutputStream 을 생성, 반환한다.

그리고 나서 네임노드와 통신을 하여 각종 조건(권한, 데이터서 여유공간의 확인 등)을 체크한다. 

그 후 패킷(데이터)을 전송하기 시작한다.   리플리카셋 설정에 따라서 데이터 전송을 위한 파이프라인을 만들게 된다. 

네임노드는 저장할 타겟 데이터노드를 알려주고 클라이언트에서 해당 첫번째 데이터 노드로 전송을 시작한다. 첫번째 데이터노드는 두번째로, 두번째는 세번째로 패킷을 전송 요청, 전송 한다. 전송을 마친 데이터노드는 전송이 완료되었다는 ack를 자신에게 데이터 전송요청한 노드로 다시 전송한다. 

만약 위에 상황에서 fail 이된다면, 네임노드로부터 다시 장애가 생긴 노드를 제외한 데이터노드 리스트를 받아 위와 같은 순서로 재전송 하게 된다.

모든 데이터 노드에서 저장완료 ack 를 받게 되면 네임노드는  데이터가 정상 저장되었다는 것으로 인식하고 열어둔 파이프라인을 닫게 된다.


출처 : http://blrunner.com/?page=5

//
프로그래밍/Hadoop ETC 2015. 6. 17. 23:12
OOZIE는 하둡 잡을 스케쥴링하는 코디네이터 역할을 한다. 정해진 시간 스케쥴과, 특정 작업 workflow를 만들 수 있다. 예를 들면 MR잡으로 부터 나온 결과를 Mysql 로 넣는다던지, 중간에 가공을 한번 더 거치는 java 잡을 거친는 작업등을 스케쥴링 가능하다.

cloudera manager 및 oozie를 hdfs와 연동 및 설치하는 과정은 생략한다. 

참조 문서 : https://cwiki.apache.org/confluence/display/OOZIE/Map+Reduce+Cookbook

구성 :  workflow.xml, coordinator.xml , job.properties lib/ 각종 lib 파일들

명령 : 

1 ) hadoop fs -put OOZIE workflow 디렉토리 hdfs://hdfs주소/tmp/test_oozie <- oozie잡 또한 MR의 한 프로세스로 돌아가므로 hdfs 상에 올려주고, 실행한다.

2 ) oozie job -oozie http://hdfs주소:oozieport/oozie -config job.properties위치 -run   <- oozie 잡을 서브밋 하게 된다.
oozie 잡들은 hdfs 상에서 작동하므로 먼저 oozie잡 디렉토리를 올려주어야 한다. 



job.properties 속성.
jobTracker=***** 

nameNode=hdfs://******  <-- hdfs 도메인

workflowApplicationPath=${nameNode}/workflow디렉토리 <- 서브밋한 잡 path

oozie.coord.application.path=${workflowApplicationPath}/coordinator.xml  <- 코디네이터 파일(여기서 시간간격 등을 조절 가능하다.

oozie.use.system.libpath=true

oozie.libpath=${workflowApplicationPath}/lib  <- oozie job에 들어가는 mr jar 파일이 될 수도 있고 mysql jdbc jar 등이 위치하는 path

### coordinator name ###

coordNameSuffix=20150115 

### about time ###

timeoutCoord=10

concurrencyCoord=2

startDate=2015-01-15T12:40+0900   <-- job을 실행시킬 시작 시간 

endDate=2022-01-01T00:00+0900  <-- job을 종료시킬시간

inputInitialTime=2015-01-15T11:40+0900  <-- input 데이터의 입력시간. 즉 job 시작시간 이전으로 설정함으로 써 한시간 이전 데이터를 읽어 들이겠다는 의미.

outputInitialTime=2015-01-15T11:40+0900 

timezoneCoord=Asia/Seoul 

timezoneDataSet=Asia/Seoul

instanceIndex=-1  

coordFrequency=1

inputFrequency=1

outputFrequency=1

### path ###

InputPath= hdfs상의 input data위치

outputPath= hdfs상의 output data 위치 

### delete conf ###

deleteTmp=false

###  mapred.map.tasks value ###

maxSplitSize=33554432 

아래와 같이 RDB로 넣기 위해 db커넥션 정보도 넣었다.

jdbcUrl=

jdbcUsername=

jdbcPassword=

tableName=

filenamePattern=part-r-000*
##################### coordinator.xml ########################
아래 들어가는 ${ xxx } 는 위 job.properties 에서 읽어들이는 것이다.



    

        ${timeoutCoord}

        ${concurrencyCoord}

    

      <----- input data 의 주기 및 데이터 위치 등을 셋팅 해준다.

        

            ${inputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}

            

        

        

            ${outputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}

${YEAR}, 등은 OOZIE에서 지원한다. 

        

    

       

        

            ${coord:current(instanceIndex)}

        

    

    

        

            ${coord:current(instanceIndex)}

        

    

    

        

            ${workflowApplicationPath}/workflow.xml

            

                

                    queueName

                    default

                

                

                    inputRoot

                    ${coord:dataIn('input')}

                

                

                    outputRoot

                    ${coord:dataOut('output')}

                

                

                    inputYMDH

                    ${coord:formatTime(coord:dateOffset(coord:nominalTime(), inputFrequency*instanceIndex, 'HOUR'),'yyyyMMddHH')}

                

            

        

    



##################### workflow.xml ########################





        

  

        

            ${jobTracker}

            ${nameNode}

            ${workflowApplicationPath}/hive/hive-site.xml

            

                

                    oozie.hive.log.level

                    DEBUG

                

            

                

            outputDir=${outputRoot}

            inputY=${inputY}           

        

        

        

    

    

        

            ${fs:exists(outputRoot) == "true"} 

            

        

    

   
        
            ${jobTracker}
            ${nameNode}
            
                
                    oozie.sqoop.log.level
                    DEBUG
                
            
            export
            --connect
            ${jdbcURL}
            --username
            ${jdbcUsername}
            --password
            ${jdbcPassword}
            -m
            3
            --table
            ${exportTableName}
            --map-column-java                      
            DATE_IDX=java.sql.Date,column1=Integer,column2=Long
            --columns              
   DATE_IDX, column1, column2
            --input-fields-terminated-by
            \001
            --export-dir
            ${outputRoot}
        
        
        
    
    
    failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
    



//
프로그래밍/Hadoop ETC 2015. 6. 13. 10:15


1. hive 의 작동

hive 란 놈이 hdfs에 있는 파일을 SQL을 이용해서 조회 할 수 있도록 하는 녀석인데 

mapreduce  generator  정도로 보면 될 것 같다.   결국 잡은 mr  로 돈다. 

이놈을 조회 하기 위해서는 RDB같은  테이블을 생성해야한다, 보통 mysql과 연동하여 테이블 관련 메타정보들을 갖고 있게 한다.


2. table 만들기 , 관리

hive 테이블을 만들때는 두종류가 되는데 external, managed 테이블로 할 수 있다.

external로 하게 되면 테이블 형태만 만들고 hdfs의 위치를 location으로 잡아 줄 수 있다. 그럼 해당 위치(hdfs)에서 데이터를 직접 부른다. 테이블에 저장되는 데이터는 hdfs에 파일 형식(시퀀스, 텍스트)을 따른다. 해당 파일 형식을 stored as 로 지정 할 수 있다. 

파일 구분자를 따로 특문(multi character)등을 이용할 경우 따로 serializer,desirializer 를   구현해야 하는데 

row format serde 'org.apache.com.serde' 로 해당 클래스를  선언해서 사용 할 수 있다.

ex)

create EXTERNAL table test_data (

   userid STRING,

   csinfo map<string, string>)

partitioned by(type string, yyyy string, mm string, dd string, hh string)

row format serde 'org.apache.com.serde' 

WITH SERDEPROPERTIES ("field.delimited"="", "collection.delimited"="&","mapkey.delimited"="=")

stored as textfile;


3. 데이터 타입

data_type : primitive_type, array_type, map_type, struct_type

하이브에서는 컬렉션 타입을 지원한다. 이부분이 장점인듯.

위에 생성한 테이블처럼 타입을 선언하면 사용할 수 있다. 


4. 데이터 조회

일반적인 sql과 같이 이용할 수 있으며, 하지만 mr잡이라는 특수성을 이해해야 클러스터 노드들및 하이브에 무리를 주지 않을 수 있다.

예를 들면 특정조건없이 너무 많은 데이터를 select  할 경우 coudera 및 hive 서버가 다운될 위험도 있더라.

row format delimited

fields terminated by " " 로 delimeter 를 조절 할 수 있다.

insert overwrite local directory '/tmp' 로 select 결과 파일을 출력 할 수 있다. local을 빼개 되면 hdfs의 위치로 기본한다.)


4. 하이브 파티션

hive에는 partition이란 개념이 있는데 테이블을 만들때 partition by (yyyy string, mm string) 식으로 설정 할 수 있다.

파일(row단위로) 읽어서 처리하는것의 단점으로 모든 데이터를 다 불러들여 처리하면 많은 mapper가 돌게 되고 reduce 작업 또한 많아 지게 된다.

그래서 파일을 파티셔닝해서 읽어들이는 데이터를 적게 하는 전략인것이다. hdfs상에 파티션으로 사용할 필드?별로 파일을 저장하여 파티션으로 사용할 수 있다. 보통 일별, 월별, 년별 로 데이터를 조회하는 경우가 많으니 해당 조건(년,월,일)로 디렉토리를 생성하고 데이터를 적재한다. 그리고 이것을 파티션을 하여 hive  데이터 조회시에 이용한다. 

테이블 선언시에 이처럼 선언하여 할 수 도 있고,  

partitioned by(yyyy string, mm string, dd string)

테이블 생성 후에 alter 해서 사용 할 수 있다.

alter table test_table Add IF NOT EXISTS partition( yyyy='2015',mm='01',dd='15') location '/tmp/test/2015/01/15'

아마도 해당 파티션 필드만 이용하여 조회할 경우 데이터 사이즈를 아직 측정하지는 않았지만 mr을 돌지 않고 바로 나오는 듯 하다.

//
프로그래밍/Hadoop ETC 2015. 6. 13. 00:16

hive 조회 결과 파일로 떨구기 

insert overwrite local directory '/tmp/result'

select * from test_tbl 


//
프로그래밍/Hadoop ETC 2015. 3. 28. 11:53
하둡에서 데이터를 추출하는 가장 기본적인? 수동적인 방법인 mapreduce 를 java 로 구현한다. 

하둡의 기본 설치 및 configuration은 생략 맵과 리듀스 구조 하둡 기본 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html // 

fs 파일 io 와 관련된 커맨드 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CommandsManual.html ex) hadoop fs -mkdir '/tmp/temp' // 

job 실행 

ex)hadoop jar temp.jar

Job 구조 

 클라이언트(mr 잡을 실행하는 머쉰)에서 job을 시작하게 되면 소스에서 지정한 하둡설정 파일 혹은 직접 선언한 클러스터의 namenode에게 파일리스트, 위치를 요청하고 접근 허가를 얻는다. 각 데이터노드들에서 map작업이 시작되는데 이 작업은 Mapper 클래스의 map함수를 overide 구현한다. 이 과정에서 row단위로 읽어들여 텍스트를 key value 쌍으로 만들어준다. 이때 key value 쌍으로 만든다는 것은. id별 방문 횟수를 카운트 한다고 할 때, ['laesunk', 1] ['laesunk',1] 이런식으로 각 데이터 노드가 갖고 있는 raw데이터에서 라인별로 읽으며 id와 value를 단순 맵 형태로 만들어 줌을 이야기 한다. 이것은 맵 작업이 다 끝나게 되면 로컬로 파일로 쓰고 리듀서가 http 통신으로 가져간다. 이 사이에 셔플링, 컴바이닝 등을 통해 튜닝을 해볼 수 있다. 리듀서에서는 mapper로 부터 나오는 결과중 같은 key들을 합칩니다. 여러 mapper에서 온 데이터중 같은 key를 갖는 데이터가 있을테니 그것들을 합치면 key, iterable 의 형태로 만들게 된다. 그리고 각 input pair 마다 reduce 함수가 돌게 되는데 이때 value들은 iterable 한 형태이므로 추가 작업등을 더 추가 할 수도 있다. 

 JOBmain

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class testjob{
	protected static Job job;
	protected static JobConf conf;
	
	private static String HDFS_PREFIX = "hadoophost";
	private static String MAP_TASKS_CONF_KEY = "mapred.map.tasks";
	private static String REDUCE_TASKS_CONF_KEY = "mapred.reduce.tasks";


	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		conf = new JobConf();
		conf.addResource(new Path("mapred-site.xml"));
		conf.addResource(new Path("core-site.xml"));
		conf.addResource(new Path("hdfs-site.xml"));   //하둡 설정파일들 등록   여기서 하둡 데이터노드들 등의 환경을 읽어 온다.  버퍼 사이즈등등.  소스에서 세부적 옵션 셋업 코드 가능

		conf.set(MAP_TASKS_CONF_KEY, "12");     
		conf.set(REDUCE_TASKS_CONF_KEY, "3");			

		conf.setJar(args[0]);  //  파라미터로 jar 페키지 파일을 지정해준다.
	
		job = new Job(conf);  
		job.setJobName("jobname");
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(testMapper.class);            //맵퍼 클래스 아래 소스의 클래스  지정
		job.setReducerClass(testReducer.class);
		job.setOutputKeyClass(Text.class);    //   맵퍼에서 나갈때 키값 타입 
		job.setOutputValueClass(IntWritable.class);    // 맵퍼에서 나갈때 벨류 타입 
		
		try{		
			FileInputFormat.addInputPath(job,new Path("input path "));  // 이부분에 하둡에 들어있는 데이터 위치를 지정해준다. 
			FileOutputFormat.setOutputPath(job, new Path("ouputpath")); //이부분에는 맵리듀스 잡으로 부터 나오는 결과 파일의 위치를 지정해준다. 이역시 하둡 상에 위치
		}catch(IOException e){
			e.printStackTrace();
		}

		try {
			job.waitForCompletion(true);
			Thread.sleep(10000);
		} catch (ClassNotFoundException e){
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e){
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
하둡에서는 int ,String의 타입을 쓰지않고 그것을 자체구현한 writable 클래스를 상속한 intwritable 따위의 객체로 쓴다. 직렬화를 위해 serializable을 구현해서 상속받은 것이다. 

 mapper source
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class testMapper extends Mapper{
	private Text keyVar = new Text();
	private IntWritable valueVar = new IntWritable();
	private String kvDelimeter = "\";
	
	
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
		String data = value.toString();		
		try{
	                String[] datas = data.split(kvDelimeter);
			String name=datas[0];
			String cnt = datas[1];
	
			keyVar.set(userId);
			valueVar.set(Integer.parseInt(count));
			context.write(keyVar,valueVar);

		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
맵에서 나온 데이터들은 각 키별로 소팅되어 reducer로 입력되어진다. 아래는 키값과 벨류리스트를 들어오면서 키에 벨류들의 총 합을 구하는 내용이다. 
 reducer source
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;

public class testReducer extends Reducer {

	private Text keyVar = new Text();

	private IntWritable valueVar = new IntWritable();

	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException 


	{ 

	int count=0;

		for(IntWritable value: values)

		{ 

			count+=value.get(); 

		}

		keyVar.set(key); valueVar.set(count); 

		context.write(keyVar,valueVar); 

	}

}


//
프로그래밍/Hadoop ETC 2015. 3. 26. 14:55

http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/YarnCommands.html#logs


Dump the container logs

  Usage: yarn logs <options>
COMMAND_OPTIONSDescription
-applicationId ApplicationIdSpecify an application id
-appOwner AppOwnerSpecify an application owner
-containerId ContainerIdSpecify a container id
-nodeAddress NodeAddressSpecify a node address


example) yarn logs -applicationId job1238547128312

//