오픈소스 주키퍼에 대한 소스 분석.
QuorumPeerMain 으로 시작.
'프로그래밍 > zookeeper' 카테고리의 다른 글
zookeeper watcher 구현 (0) | 2015.10.18 |
---|
오픈소스 주키퍼에 대한 소스 분석.
QuorumPeerMain 으로 시작.
zookeeper watcher 구현 (0) | 2015.10.18 |
---|
하둡 파일 시스템에 파일을 저장할 때 내부 구조에 대해서 알아보고자 한다.
하둡 api들(put) 같은 명령어를 사용해서 파일을 hdfs상에 올리게 되는데 그 과정에 대해서 "시작하세요 하둡 프로그래밍" (저자:정재화) 책의 내용과 https://github.com/apache/hadoop 의 소스를 참고하여 정리한다.
기본적인 흐름 :
클라이언트의 요청 -> 네임노드와 통신 -> 권한 수락 -> 파일 저장 스트림 생성 -> 파일전송 -> 스트림 닫기
위와 같이 이루어집니다.
하둡 파일 시스템은 실제 파일들의 상태와 달리 일반 파일시스템과 같은 모양으로 다루기 위해 추상화를 시켜놓았다.
실제 하둡 소스에서 보면 FileSystem 이라는 클래스로 추상화되어져있다. 그리고 이 클래스를 상속하여
DistributedFileSystem클래스를 정의한다. 이 클래스의 create 메소드를 이용하여 입력 스트림을 만든다. 이 입력 스트림 역시 FSDataOutputStream 을 생성, 반환한다.
그리고 나서 네임노드와 통신을 하여 각종 조건(권한, 데이터서 여유공간의 확인 등)을 체크한다.
그 후 패킷(데이터)을 전송하기 시작한다. 리플리카셋 설정에 따라서 데이터 전송을 위한 파이프라인을 만들게 된다.
네임노드는 저장할 타겟 데이터노드를 알려주고 클라이언트에서 해당 첫번째 데이터 노드로 전송을 시작한다. 첫번째 데이터노드는 두번째로, 두번째는 세번째로 패킷을 전송 요청, 전송 한다. 전송을 마친 데이터노드는 전송이 완료되었다는 ack를 자신에게 데이터 전송요청한 노드로 다시 전송한다.
만약 위에 상황에서 fail 이된다면, 네임노드로부터 다시 장애가 생긴 노드를 제외한 데이터노드 리스트를 받아 위와 같은 순서로 재전송 하게 된다.
모든 데이터 노드에서 저장완료 ack 를 받게 되면 네임노드는 데이터가 정상 저장되었다는 것으로 인식하고 열어둔 파이프라인을 닫게 된다.
출처 : http://blrunner.com/?page=5
[Hadoop] 작은 파일들 합치기 (0) | 2016.05.10 |
---|---|
[HIVE] patition by 쿼리 (0) | 2015.12.08 |
[Hadoop] OOZIE 사용 (with hive, sqoop) (0) | 2015.06.17 |
[Hadoop] hive 파티션 설정, external table, datatype (0) | 2015.06.13 |
hive 조회 결과 파일로 떨구기 (0) | 2015.06.13 |
elasticsearch 에 analyzer plugin을 만들다보니 offsetAttribue 클래스를 사용하게 되었다
이부분이 잘못되게 되면 하이라이팅이 엉뚱하게 된다. 그래서 token stream에서 나온 토큰들을 짤라서 작업할 때는
offsetAttribue의 setoffset 메소드를 이용해 offset을 확실히 해주어야 한다.
[Elasticsearch] Decay function 에 대해서 (수정중) (0) | 2016.08.30 |
---|---|
[Elasticsearch] 분산환경에서의 인덱스용량과 검색 속도 (0) | 2016.02.21 |
[Elasticsearch] Index Mapping with java (0) | 2015.07.15 |
[Elasticsearch] 플러그인 설치 with jar (0) | 2015.07.09 |
[Elasticsearch] StringOutofIndex 에러 (0) | 2015.07.09 |
XContentBuilder mappings = null; try { mappings = XContentFactory.jsonBuilder(); mappings.startObject() .startObject("test_index") .field("dynamic","false") // dynamic mapping 을 거부한다. .startObject("_all").field("enabled",false).endObject() //모든 필드를 찾지 않도록 한다. .startObject("properties") // 인덱스 내부에 필드들은 properties 아래 만들어져야한다. .startObject("document_title") .field("type", "string") .startObject("norms").field("enabled", false).endObject() .field("store", "yes") // 인덱스에 추가하는 것 뿐 아니라 저장하여 디스플레이 가능하도록 setting .field("index", "analyzed") // 해당 필드 분석하도록 한다. .field("index_analyzer", "name_analyzer") // 인덱스시에 이용하는 analyzer를 명시할 수 있다. .field("search_analyzer", "name_analyzer") .endObject() .startObject("name_number").field("type", "long").field("store", "yes").endObject() .startObject("st_number").field("type", "long").field("store", "yes").endObject() .startObject("type_info") .field("type", "nested") // nested 항목으로 만들 수 있다. 이렇게 되면 하나의 document로 만들어지는 듯. .startObject("properties") .startObject("type_name") .field("type", "string") .endObject() .startObject("type_value") .field("type", "double") .endObject() .endObject() .endObject() .endObject() .endObject() .endObject(); } catch (IOException e) { e.printStackTrace(); } XContentBuilder indexSettings = null; try { indexSettings = XContentFactory.jsonBuilder(); indexSettings.startObject() .field("index.number_of_shards", 3) // 샤드 갯수 지정 .field("index.number_of_replicas", 3) // 레플리카 갯수 지정 // .field("index.store.type", "memory") // 메모리에 인덱스 데이터 저장 할 지 .endObject(); } catch (IOException e) { e.printStackTrace(); } CreateIndexRequestBuilder indexCreateRequest = client.admin().indices().prepareCreate(indexName) .addMapping("test_index", mappings) // 메핑 json 데이터를 인덱스 네임파라미터와 함께 setting .setSettings(indexSettings); // index setting 에 관한 정보는 setsettings라는 함수로 만듬 try{ indexCreateRequest.execute().actionGet(); }catch(Exception e){ e.printStackTrace(); }
[Elasticsearch] 분산환경에서의 인덱스용량과 검색 속도 (0) | 2016.02.21 |
---|---|
[Elasticsearch] lucene offsetAttribue (0) | 2015.07.19 |
[Elasticsearch] 플러그인 설치 with jar (0) | 2015.07.09 |
[Elasticsearch] StringOutofIndex 에러 (0) | 2015.07.09 |
[Elasticsearch] score 결과 보기 (explain) (0) | 2015.06.13 |
maven 패키징 & 카피 후에
elastic_path/bin/plugin --url file:///absolute/path/to/elasticsearch-cardinality-plugin-0.0.1.jar --install plugin_name
주의점 : file:// 를 꼭 붙여줘야한다는 것. 그냥 file path 예시인줄...
다른방법.
plugins 밑에 디렉토리에 플러그인 네임과 같은 디렉토리를 만들고 jar를 위치시킨다.
그리고 ES를 재시작한다.
[Elasticsearch] lucene offsetAttribue (0) | 2015.07.19 |
---|---|
[Elasticsearch] Index Mapping with java (0) | 2015.07.15 |
[Elasticsearch] StringOutofIndex 에러 (0) | 2015.07.09 |
[Elasticsearch] score 결과 보기 (explain) (0) | 2015.06.13 |
[Elasticsearch] elasticsearch analyze결과 보기 (0) | 2015.06.13 |
analyzer plugin을 수정하고 올리고 나서 몇번의 리스타트 후에 이런 에러를 만낫다.
마스터 노드의 로그상에서는 StringOutofIndex 에러만 나게 되는데 어디서 나는지에 대한 것은 찾아볼 수 없다.
그러다가 analyzer에서 token을 substring 하는 부분에서 테스트적으로 넣엇던 것이 실수로 머지 되어 패키징 했다는 것을 알고 수정.
결론 : elasticsearch의 로그가 생각보다 친절 하지 않다.
[Elasticsearch] Index Mapping with java (0) | 2015.07.15 |
---|---|
[Elasticsearch] 플러그인 설치 with jar (0) | 2015.07.09 |
[Elasticsearch] score 결과 보기 (explain) (0) | 2015.06.13 |
[Elasticsearch] elasticsearch analyze결과 보기 (0) | 2015.06.13 |
[Elasticsearch] Custom analyzer plugin 만들기 (0) | 2015.05.28 |
jobTracker=***** nameNode=hdfs://****** <-- hdfs 도메인 workflowApplicationPath=${nameNode}/workflow디렉토리 <- 서브밋한 잡 path oozie.coord.application.path=${workflowApplicationPath}/coordinator.xml <- 코디네이터 파일(여기서 시간간격 등을 조절 가능하다. oozie.use.system.libpath=true oozie.libpath=${workflowApplicationPath}/lib <- oozie job에 들어가는 mr jar 파일이 될 수도 있고 mysql jdbc jar 등이 위치하는 path ### coordinator name ### coordNameSuffix=20150115 ### about time ### timeoutCoord=10 concurrencyCoord=2 startDate=2015-01-15T12:40+0900 <-- job을 실행시킬 시작 시간 endDate=2022-01-01T00:00+0900 <-- job을 종료시킬시간 inputInitialTime=2015-01-15T11:40+0900 <-- input 데이터의 입력시간. 즉 job 시작시간 이전으로 설정함으로 써 한시간 이전 데이터를 읽어 들이겠다는 의미. outputInitialTime=2015-01-15T11:40+0900 timezoneCoord=Asia/Seoul timezoneDataSet=Asia/Seoul instanceIndex=-1 coordFrequency=1 inputFrequency=1 outputFrequency=1 ### path ### InputPath= hdfs상의 input data위치 outputPath= hdfs상의 output data 위치 ### delete conf ### deleteTmp=false ### mapred.map.tasks value ### maxSplitSize=33554432 아래와 같이 RDB로 넣기 위해 db커넥션 정보도 넣었다. jdbcUrl= jdbcUsername= jdbcPassword= tableName= filenamePattern=part-r-000*##################### coordinator.xml ########################
${timeoutCoord} ${concurrencyCoord} <----- input data 의 주기 및 데이터 위치 등을 셋팅 해준다. ${inputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR} ${outputPath}/${YEAR}/${MONTH}/${DAY}/${HOUR} ${YEAR}, 등은 OOZIE에서 지원한다.${coord:current(instanceIndex)} ${coord:current(instanceIndex)} ${workflowApplicationPath}/workflow.xml queueName default inputRoot ${coord:dataIn('input')} outputRoot ${coord:dataOut('output')} inputYMDH ${coord:formatTime(coord:dateOffset(coord:nominalTime(), inputFrequency*instanceIndex, 'HOUR'),'yyyyMMddHH')}
${jobTracker} ${nameNode} ${workflowApplicationPath}/hive/hive-site.xml outputDir=${outputRoot} inputY=${inputY} oozie.hive.log.level DEBUG ${fs:exists(outputRoot) == "true"} ${jobTracker} ${nameNode} oozie.sqoop.log.level DEBUG export --connect ${jdbcURL} --username ${jdbcUsername} --password ${jdbcPassword} -m 3 --table ${exportTableName} --map-column-java DATE_IDX=java.sql.Date,column1=Integer,column2=Long --columns DATE_IDX, column1, column2 --input-fields-terminated-by \001 --export-dir ${outputRoot} failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
[HIVE] patition by 쿼리 (0) | 2015.12.08 |
---|---|
[Hadoop] hdfs에 데이터 올라가는 과정 (0) | 2015.08.21 |
[Hadoop] hive 파티션 설정, external table, datatype (0) | 2015.06.13 |
hive 조회 결과 파일로 떨구기 (0) | 2015.06.13 |
MR(mapreduce) with java (0) | 2015.03.28 |
루씬을 사용하는 ES는 루씬 스코어링 과정을 따른다. 따로 analyzer를 구현해서 사용하는 터라 특정 필드에 매칭되서 얼마의 scoring이 되는가를 확인하기 위해서 explain 을 사용할 수 있다. RDB의 explain과 비슷하게 scoring 절차, 결과를 볼 수 있다.
query 을 활용 한 curl 쿼리
curl -XGET 'localhost:9200/twitter/tweet/1/_explain' -d '{ "query" : { "term" : { "message" : "search" } } }'
query_string을 활용 한 curl 쿼리
curl -XGET 'localhost:9200/twitter/tweet/1/_explain?q=message:search'
"query_string": {
"query": "content:this AND available:yes"
}
이런식도 가능
"query_string": {
"query": "content:this OR that"
}
Json query
{ "explain": true, "query" : { "term" : { "user" : "kimchy" } } }
결과 json
{ "matches" : true, "explanation" : { "value" : 0.15342641, "description" : "fieldWeight(message:search in 0), product of:", "details" : [ { "value" : 1.0, "description" : "tf(termFreq(message:search)=1)" }, { "value" : 0.30685282, "description" : "idf(docFreq=1, maxDocs=1)" }, { "value" : 0.5, "description" : "fieldNorm(field=message, doc=0)" } ] } }
[Elasticsearch] 플러그인 설치 with jar (0) | 2015.07.09 |
---|---|
[Elasticsearch] StringOutofIndex 에러 (0) | 2015.07.09 |
[Elasticsearch] elasticsearch analyze결과 보기 (0) | 2015.06.13 |
[Elasticsearch] Custom analyzer plugin 만들기 (0) | 2015.05.28 |
[Elasticsearch] 샤드, 레플리카 갯수 설정 (0) | 2015.05.28 |
1. hive 의 작동
hive 란 놈이 hdfs에 있는 파일을 SQL을 이용해서 조회 할 수 있도록 하는 녀석인데
mapreduce generator 정도로 보면 될 것 같다. 결국 잡은 mr 로 돈다.
이놈을 조회 하기 위해서는 RDB같은 테이블을 생성해야한다, 보통 mysql과 연동하여 테이블 관련 메타정보들을 갖고 있게 한다.
2. table 만들기 , 관리
hive 테이블을 만들때는 두종류가 되는데 external, managed 테이블로 할 수 있다.
external로 하게 되면 테이블 형태만 만들고 hdfs의 위치를 location으로 잡아 줄 수 있다. 그럼 해당 위치(hdfs)에서 데이터를 직접 부른다. 테이블에 저장되는 데이터는 hdfs에 파일 형식(시퀀스, 텍스트)을 따른다. 해당 파일 형식을 stored as 로 지정 할 수 있다.
파일 구분자를 따로 특문(multi character)등을 이용할 경우 따로 serializer,desirializer 를 구현해야 하는데
row format serde 'org.apache.com.serde' 로 해당 클래스를 선언해서 사용 할 수 있다.
ex)
create EXTERNAL table test_data (
userid STRING,
csinfo map<string, string>)
partitioned by(type string, yyyy string, mm string, dd string, hh string)
row format serde 'org.apache.com.serde'
WITH SERDEPROPERTIES ("field.delimited"="", "collection.delimited"="&","mapkey.delimited"="=")
stored as textfile;
3. 데이터 타입
data_type : primitive_type, array_type, map_type, struct_type
하이브에서는 컬렉션 타입을 지원한다. 이부분이 장점인듯.
위에 생성한 테이블처럼 타입을 선언하면 사용할 수 있다.
4. 데이터 조회
일반적인 sql과 같이 이용할 수 있으며, 하지만 mr잡이라는 특수성을 이해해야 클러스터 노드들및 하이브에 무리를 주지 않을 수 있다.
예를 들면 특정조건없이 너무 많은 데이터를 select 할 경우 coudera 및 hive 서버가 다운될 위험도 있더라.
row format delimited
fields terminated by " " 로 delimeter 를 조절 할 수 있다.
insert overwrite local directory '/tmp' 로 select 결과 파일을 출력 할 수 있다. local을 빼개 되면 hdfs의 위치로 기본한다.)
4. 하이브 파티션
hive에는 partition이란 개념이 있는데 테이블을 만들때 partition by (yyyy string, mm string) 식으로 설정 할 수 있다.
파일(row단위로) 읽어서 처리하는것의 단점으로 모든 데이터를 다 불러들여 처리하면 많은 mapper가 돌게 되고 reduce 작업 또한 많아 지게 된다.
그래서 파일을 파티셔닝해서 읽어들이는 데이터를 적게 하는 전략인것이다. hdfs상에 파티션으로 사용할 필드?별로 파일을 저장하여 파티션으로 사용할 수 있다. 보통 일별, 월별, 년별 로 데이터를 조회하는 경우가 많으니 해당 조건(년,월,일)로 디렉토리를 생성하고 데이터를 적재한다. 그리고 이것을 파티션을 하여 hive 데이터 조회시에 이용한다.
테이블 선언시에 이처럼 선언하여 할 수 도 있고,
partitioned by(yyyy string, mm string, dd string)
테이블 생성 후에 alter 해서 사용 할 수 있다.
alter table test_table Add IF NOT EXISTS partition( yyyy='2015',mm='01',dd='15') location '/tmp/test/2015/01/15'
아마도 해당 파티션 필드만 이용하여 조회할 경우 데이터 사이즈를 아직 측정하지는 않았지만 mr을 돌지 않고 바로 나오는 듯 하다.
[Hadoop] hdfs에 데이터 올라가는 과정 (0) | 2015.08.21 |
---|---|
[Hadoop] OOZIE 사용 (with hive, sqoop) (0) | 2015.06.17 |
hive 조회 결과 파일로 떨구기 (0) | 2015.06.13 |
MR(mapreduce) with java (0) | 2015.03.28 |
MR job log 파일 뽑기 (0) | 2015.03.26 |
엘라스틱서치 분석기를 통해 쪼개지는 term 을 확인할 수 있다. rest api 로 제공하는데 analyzer의 작동을 확인해볼수 있다.
analyzer 부분을 생략하면 es 기본 분석기가 사용된다. 플러그인타입으로 자체 개발한 analyzer를 테스트 중에
웹브라우저에서 바로 분석 결과를 확인할 수 있는 api 있길래..
analyzer 파라미터로 분석기이름을 넣어주고 text 값으로 테스트할 텍스트를 넣어준다.
http://host:9200/index_name/_analyze?analyzer=test_analyzer&text=오늘은 뭐해요
[Elasticsearch] StringOutofIndex 에러 (0) | 2015.07.09 |
---|---|
[Elasticsearch] score 결과 보기 (explain) (0) | 2015.06.13 |
[Elasticsearch] Custom analyzer plugin 만들기 (0) | 2015.05.28 |
[Elasticsearch] 샤드, 레플리카 갯수 설정 (0) | 2015.05.28 |
Elasticsearch cluster health check (0) | 2015.05.15 |