프로그래밍/Hadoop ETC 2015. 8. 21. 23:28

하둡 파일 시스템에 파일을 저장할 때 내부 구조에 대해서 알아보고자 한다.

하둡 api들(put) 같은 명령어를 사용해서 파일을 hdfs상에 올리게 되는데 그 과정에 대해서 "시작하세요 하둡 프로그래밍" (저자:정재화) 책의 내용과 https://github.com/apache/hadoop 의 소스를  참고하여 정리한다.

기본적인 흐름 :

클라이언트의 요청 -> 네임노드와 통신 -> 권한 수락 -> 파일 저장 스트림 생성 -> 파일전송 -> 스트림 닫기

위와 같이 이루어집니다.

하둡 파일 시스템은 실제 파일들의 상태와 달리 일반 파일시스템과 같은 모양으로 다루기 위해 추상화를 시켜놓았다.

실제 하둡 소스에서 보면 FileSystem 이라는 클래스로 추상화되어져있다. 그리고 이 클래스를 상속하여 

DistributedFileSystem클래스를 정의한다. 이 클래스의 create 메소드를 이용하여 입력 스트림을 만든다. 이 입력 스트림 역시 FSDataOutputStream 을 생성, 반환한다.

그리고 나서 네임노드와 통신을 하여 각종 조건(권한, 데이터서 여유공간의 확인 등)을 체크한다. 

그 후 패킷(데이터)을 전송하기 시작한다.   리플리카셋 설정에 따라서 데이터 전송을 위한 파이프라인을 만들게 된다. 

네임노드는 저장할 타겟 데이터노드를 알려주고 클라이언트에서 해당 첫번째 데이터 노드로 전송을 시작한다. 첫번째 데이터노드는 두번째로, 두번째는 세번째로 패킷을 전송 요청, 전송 한다. 전송을 마친 데이터노드는 전송이 완료되었다는 ack를 자신에게 데이터 전송요청한 노드로 다시 전송한다. 

만약 위에 상황에서 fail 이된다면, 네임노드로부터 다시 장애가 생긴 노드를 제외한 데이터노드 리스트를 받아 위와 같은 순서로 재전송 하게 된다.

모든 데이터 노드에서 저장완료 ack 를 받게 되면 네임노드는  데이터가 정상 저장되었다는 것으로 인식하고 열어둔 파이프라인을 닫게 된다.


출처 : http://blrunner.com/?page=5

//
프로그래밍/ElasticSearch 2015. 7. 19. 00:18

elasticsearch 에 analyzer plugin을 만들다보니 offsetAttribue 클래스를 사용하게 되었다

이부분이 잘못되게 되면 하이라이팅이 엉뚱하게 된다. 그래서 token stream에서 나온 토큰들을 짤라서 작업할 때는 

offsetAttribue의 setoffset 메소드를 이용해 offset을 확실히 해주어야 한다. 


//
프로그래밍/ElasticSearch 2015. 7. 15. 21:44
elasticsearch에 인덱스를 만들때 일반 json ES query 로 만들 수 있지만 여기서는 자바로 인덱스를 생성하는 방법을 기록한다. elasticsearch에서는 쉽게 json 하이어라키를 만들어 낼 수 있도록 XcontentBuilder 클래스를 제공한다. startObject로 시작하여 field라는 항목으로 하위 노드에 추가 할 수 있다.
XContentBuilder mappings = null; 
  try {
   mappings = XContentFactory.jsonBuilder();
   mappings.startObject()
    .startObject("test_index")
     .field("dynamic","false")  // dynamic mapping 을 거부한다.
     .startObject("_all").field("enabled",false).endObject()  //모든 필드를 찾지 않도록 한다.
     .startObject("properties")  // 인덱스 내부에 필드들은 properties 아래 만들어져야한다.
       .startObject("document_title")
          .field("type", "string")
          .startObject("norms").field("enabled", false).endObject()
          .field("store", "yes")  //  인덱스에 추가하는 것 뿐 아니라 저장하여 디스플레이 가능하도록 setting
          .field("index", "analyzed")  // 해당 필드 분석하도록 한다. 
          .field("index_analyzer", "name_analyzer")   // 인덱스시에 이용하는 analyzer를 명시할 수 있다.
          .field("search_analyzer", "name_analyzer")
       .endObject()
      .startObject("name_number").field("type", "long").field("store", "yes").endObject()
      .startObject("st_number").field("type", "long").field("store", "yes").endObject()
      .startObject("type_info")
       .field("type", "nested")  // nested 항목으로 만들 수 있다. 이렇게 되면 하나의 document로 만들어지는 듯.
       .startObject("properties")
        .startObject("type_name")
         .field("type", "string")
        .endObject()
        .startObject("type_value")
         .field("type", "double")
        .endObject()
       .endObject()
      .endObject()
     .endObject()
    .endObject()
   .endObject();
  } catch (IOException e) {
   e.printStackTrace();
  }
  
  XContentBuilder indexSettings = null; 
  try {
   indexSettings = XContentFactory.jsonBuilder(); 
   indexSettings.startObject()
       .field("index.number_of_shards", 3)   // 샤드 갯수 지정
       .field("index.number_of_replicas", 3)  // 레플리카 갯수 지정
//       .field("index.store.type", "memory")  // 메모리에 인덱스 데이터 저장 할 지 
       .endObject();
  } catch (IOException e) {
   e.printStackTrace();
  }
  
  CreateIndexRequestBuilder indexCreateRequest = client.admin().indices().prepareCreate(indexName)
    .addMapping("test_index", mappings)  // 메핑 json 데이터를 인덱스 네임파라미터와 함께 setting
    .setSettings(indexSettings);  // index setting 에 관한 정보는 setsettings라는 함수로 만듬
  try{
   indexCreateRequest.execute().actionGet();
  }catch(Exception e){
   e.printStackTrace();
  }
//
프로그래밍/ElasticSearch 2015. 7. 9. 10:02

maven 패키징 & 카피 후에

elastic_path/bin/plugin --url file:///absolute/path/to/elasticsearch-cardinality-plugin-0.0.1.jar --install plugin_name

주의점 : file:// 를 꼭 붙여줘야한다는 것. 그냥 file path 예시인줄...


다른방법.

plugins 밑에 디렉토리에 플러그인 네임과 같은 디렉토리를 만들고 jar를 위치시킨다.

그리고 ES를 재시작한다. 

//
프로그래밍/ElasticSearch 2015. 7. 9. 09:59

analyzer plugin을 수정하고 올리고 나서 몇번의 리스타트 후에 이런 에러를 만낫다.

마스터 노드의 로그상에서는 StringOutofIndex  에러만 나게 되는데 어디서 나는지에 대한 것은 찾아볼 수 없다.

그러다가 analyzer에서 token을 substring 하는 부분에서 테스트적으로 넣엇던 것이 실수로 머지 되어 패키징 했다는 것을 알고 수정.

결론 : elasticsearch의 로그가 생각보다 친절 하지 않다.

//
프로그래밍/Hadoop ETC 2015. 6. 17. 23:12
OOZIE는 하둡 잡을 스케쥴링하는 코디네이터 역할을 한다. 정해진 시간 스케쥴과, 특정 작업 workflow를 만들 수 있다. 예를 들면 MR잡으로 부터 나온 결과를 Mysql 로 넣는다던지, 중간에 가공을 한번 더 거치는 java 잡을 거친는 작업등을 스케쥴링 가능하다.

cloudera manager 및 oozie를 hdfs와 연동 및 설치하는 과정은 생략한다. 

참조 문서 : https://cwiki.apache.org/confluence/display/OOZIE/Map+Reduce+Cookbook

구성 :  workflow.xml, coordinator.xml , job.properties lib/ 각종 lib 파일들

명령 : 

1 ) hadoop fs -put OOZIE workflow 디렉토리 hdfs://hdfs주소/tmp/test_oozie <- oozie잡 또한 MR의 한 프로세스로 돌아가므로 hdfs 상에 올려주고, 실행한다.

2 ) oozie job -oozie http://hdfs주소:oozieport/oozie -config job.properties위치 -run   <- oozie 잡을 서브밋 하게 된다.
oozie 잡들은 hdfs 상에서 작동하므로 먼저 oozie잡 디렉토리를 올려주어야 한다. 



job.properties 속성.
jobTracker=***** 

nameNode=hdfs://******  <-- hdfs 도메인

workflowApplicationPath=${nameNode}/workflow디렉토리 <- 서브밋한 잡 path

oozie.coord.application.path=${workflowApplicationPath}/coordinator.xml  <- 코디네이터 파일(여기서 시간간격 등을 조절 가능하다.

oozie.use.system.libpath=true

oozie.libpath=${workflowApplicationPath}/lib  <- oozie job에 들어가는 mr jar 파일이 될 수도 있고 mysql jdbc jar 등이 위치하는 path

### coordinator name ###

coordNameSuffix=20150115 

### about time ###

timeoutCoord=10

concurrencyCoord=2

startDate=2015-01-15T12:40+0900   <-- job을 실행시킬 시작 시간 

endDate=2022-01-01T00:00+0900  <-- job을 종료시킬시간

inputInitialTime=2015-01-15T11:40+0900  <-- input 데이터의 입력시간. 즉 job 시작시간 이전으로 설정함으로 써 한시간 이전 데이터를 읽어 들이겠다는 의미.

outputInitialTime=2015-01-15T11:40+0900 

timezoneCoord=Asia/Seoul 

timezoneDataSet=Asia/Seoul

instanceIndex=-1  

coordFrequency=1

inputFrequency=1

outputFrequency=1

### path ###

InputPath= hdfs상의 input data위치

outputPath= hdfs상의 output data 위치 

### delete conf ###

deleteTmp=false

###  mapred.map.tasks value ###

maxSplitSize=33554432 

아래와 같이 RDB로 넣기 위해 db커넥션 정보도 넣었다.

jdbcUrl=

jdbcUsername=

jdbcPassword=

tableName=

filenamePattern=part-r-000*
##################### coordinator.xml ########################
아래 들어가는 ${ xxx } 는 위 job.properties 에서 읽어들이는 것이다.



    

        ${timeoutCoord}

        ${concurrencyCoord}

    

      <----- input data 의 주기 및 데이터 위치 등을 셋팅 해준다.

        

            ${inputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}

            

        

        

            ${outputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR}

${YEAR}, 등은 OOZIE에서 지원한다. 

        

    

       

        

            ${coord:current(instanceIndex)}

        

    

    

        

            ${coord:current(instanceIndex)}

        

    

    

        

            ${workflowApplicationPath}/workflow.xml

            

                

                    queueName

                    default

                

                

                    inputRoot

                    ${coord:dataIn('input')}

                

                

                    outputRoot

                    ${coord:dataOut('output')}

                

                

                    inputYMDH

                    ${coord:formatTime(coord:dateOffset(coord:nominalTime(), inputFrequency*instanceIndex, 'HOUR'),'yyyyMMddHH')}

                

            

        

    



##################### workflow.xml ########################





        

  

        

            ${jobTracker}

            ${nameNode}

            ${workflowApplicationPath}/hive/hive-site.xml

            

                

                    oozie.hive.log.level

                    DEBUG

                

            

                

            outputDir=${outputRoot}

            inputY=${inputY}           

        

        

        

    

    

        

            ${fs:exists(outputRoot) == "true"} 

            

        

    

   
        
            ${jobTracker}
            ${nameNode}
            
                
                    oozie.sqoop.log.level
                    DEBUG
                
            
            export
            --connect
            ${jdbcURL}
            --username
            ${jdbcUsername}
            --password
            ${jdbcPassword}
            -m
            3
            --table
            ${exportTableName}
            --map-column-java                      
            DATE_IDX=java.sql.Date,column1=Integer,column2=Long
            --columns              
   DATE_IDX, column1, column2
            --input-fields-terminated-by
            \001
            --export-dir
            ${outputRoot}
        
        
        
    
    
    failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
    



//
프로그래밍/ElasticSearch 2015. 6. 13. 10:28

루씬을 사용하는 ES는 루씬 스코어링 과정을 따른다. 따로 analyzer를 구현해서 사용하는 터라 특정 필드에 매칭되서 얼마의 scoring이 되는가를 확인하기 위해서 explain 을 사용할 수 있다. RDB의 explain과 비슷하게 scoring 절차, 결과를 볼 수 있다.


query 을 활용 한 curl 쿼리

curl -XGET 'localhost:9200/twitter/tweet/1/_explain' -d '{ "query" : { "term" : { "message" : "search" } } }'


query_string을 활용 한 curl 쿼리

curl -XGET 'localhost:9200/twitter/tweet/1/_explain?q=message:search'
"query_string": { "query": "content:this AND available:yes" }

이런식도 가능
"query_string": { "query": "content:this OR that" }


Json query

{ "explain": true, "query" : { "term" : { "user" : "kimchy" } }


결과 json

{ "matches" : true, "explanation" : { "value" : 0.15342641, "description" : "fieldWeight(message:search in 0), product of:", "details" : [ { "value" : 1.0, "description" : "tf(termFreq(message:search)=1)" }, { "value" : 0.30685282, "description" : "idf(docFreq=1, maxDocs=1)" }, { "value" : 0.5, "description" : "fieldNorm(field=message, doc=0)" } ] } }

//
프로그래밍/Hadoop ETC 2015. 6. 13. 10:15


1. hive 의 작동

hive 란 놈이 hdfs에 있는 파일을 SQL을 이용해서 조회 할 수 있도록 하는 녀석인데 

mapreduce  generator  정도로 보면 될 것 같다.   결국 잡은 mr  로 돈다. 

이놈을 조회 하기 위해서는 RDB같은  테이블을 생성해야한다, 보통 mysql과 연동하여 테이블 관련 메타정보들을 갖고 있게 한다.


2. table 만들기 , 관리

hive 테이블을 만들때는 두종류가 되는데 external, managed 테이블로 할 수 있다.

external로 하게 되면 테이블 형태만 만들고 hdfs의 위치를 location으로 잡아 줄 수 있다. 그럼 해당 위치(hdfs)에서 데이터를 직접 부른다. 테이블에 저장되는 데이터는 hdfs에 파일 형식(시퀀스, 텍스트)을 따른다. 해당 파일 형식을 stored as 로 지정 할 수 있다. 

파일 구분자를 따로 특문(multi character)등을 이용할 경우 따로 serializer,desirializer 를   구현해야 하는데 

row format serde 'org.apache.com.serde' 로 해당 클래스를  선언해서 사용 할 수 있다.

ex)

create EXTERNAL table test_data (

   userid STRING,

   csinfo map<string, string>)

partitioned by(type string, yyyy string, mm string, dd string, hh string)

row format serde 'org.apache.com.serde' 

WITH SERDEPROPERTIES ("field.delimited"="", "collection.delimited"="&","mapkey.delimited"="=")

stored as textfile;


3. 데이터 타입

data_type : primitive_type, array_type, map_type, struct_type

하이브에서는 컬렉션 타입을 지원한다. 이부분이 장점인듯.

위에 생성한 테이블처럼 타입을 선언하면 사용할 수 있다. 


4. 데이터 조회

일반적인 sql과 같이 이용할 수 있으며, 하지만 mr잡이라는 특수성을 이해해야 클러스터 노드들및 하이브에 무리를 주지 않을 수 있다.

예를 들면 특정조건없이 너무 많은 데이터를 select  할 경우 coudera 및 hive 서버가 다운될 위험도 있더라.

row format delimited

fields terminated by " " 로 delimeter 를 조절 할 수 있다.

insert overwrite local directory '/tmp' 로 select 결과 파일을 출력 할 수 있다. local을 빼개 되면 hdfs의 위치로 기본한다.)


4. 하이브 파티션

hive에는 partition이란 개념이 있는데 테이블을 만들때 partition by (yyyy string, mm string) 식으로 설정 할 수 있다.

파일(row단위로) 읽어서 처리하는것의 단점으로 모든 데이터를 다 불러들여 처리하면 많은 mapper가 돌게 되고 reduce 작업 또한 많아 지게 된다.

그래서 파일을 파티셔닝해서 읽어들이는 데이터를 적게 하는 전략인것이다. hdfs상에 파티션으로 사용할 필드?별로 파일을 저장하여 파티션으로 사용할 수 있다. 보통 일별, 월별, 년별 로 데이터를 조회하는 경우가 많으니 해당 조건(년,월,일)로 디렉토리를 생성하고 데이터를 적재한다. 그리고 이것을 파티션을 하여 hive  데이터 조회시에 이용한다. 

테이블 선언시에 이처럼 선언하여 할 수 도 있고,  

partitioned by(yyyy string, mm string, dd string)

테이블 생성 후에 alter 해서 사용 할 수 있다.

alter table test_table Add IF NOT EXISTS partition( yyyy='2015',mm='01',dd='15') location '/tmp/test/2015/01/15'

아마도 해당 파티션 필드만 이용하여 조회할 경우 데이터 사이즈를 아직 측정하지는 않았지만 mr을 돌지 않고 바로 나오는 듯 하다.

//
프로그래밍/ElasticSearch 2015. 6. 13. 00:22

엘라스틱서치 분석기를 통해 쪼개지는 term 을 확인할 수 있다.  rest api 로 제공하는데 analyzer의 작동을 확인해볼수 있다.

analyzer 부분을 생략하면  es  기본 분석기가 사용된다. 플러그인타입으로 자체 개발한 analyzer를 테스트 중에

웹브라우저에서 바로 분석 결과를 확인할 수 있는 api  있길래..

analyzer 파라미터로 분석기이름을 넣어주고  text 값으로 테스트할 텍스트를 넣어준다.


http://host:9200/index_name/_analyze?analyzer=test_analyzer&text=오늘은 뭐해요


//
프로그래밍/Hadoop ETC 2015. 6. 13. 00:16

hive 조회 결과 파일로 떨구기 

insert overwrite local directory '/tmp/result'

select * from test_tbl 


//