분류

2019년 2월 26일 화요일

대용량 데이터 처리 2. 오라클 병렬처리 parallel

1.PARALLEL은 언제 사용합니까?

병렬처리는 한 번에 대용량의 데이터를 처리할 경우 발생되는 부하를 여러 쓰레드에 분산처리 하는 기술입니다. 따라서 일반적인 조회나, 데이터 처리에는 INDEX를 통한 범위 처리를 주로 사용하게 됩니다. 병렬처리의 사용 빈도는 낮으며, 대용량 데이터 처리를 위한 프로시저나 프로그램에서 조심스럽게 사용됩니다. 개념적인 그림은 API에 잘 나와있어 참조하시면 되겠습니다.

오라클 병렬처리 API

이전 기술한 java 다중 쓰레드와 비슷한 구조를 갖고 있습니다. 자바의 thread와 비교한다면 thread.submit() 정도의 처리를 통해 개별 쓰레드에서 처리된 결과 집합을 재처리하는 기술이 잘 만들어져있다. 정도로 생각해도 괜찮을 것 같습니다.

2. 병렬처리 환경 정보 확인 

oracle 시스템의 환경 변수는 간단한 명령을 통해 확인할 수 있습니다. 
  select * from 
    v$parameter
  where 
  name like '%parallel%'; 

ORACLE 병렬처리 환경 문서 API
1. parallel_server : true 로 설정될 경우 병렬 서버 모드로 시작됨.
2. parallel_adaptive_multi_user : 시스템 과부하를 방지하기 위해 사용자의 병렬처리 수준을 제어합니다.

 예 : SELECT /+*PARALLEL(8)*/ * FROM DUAL; 명령의 PARALLEL(차수)에 해당함
      혹은 alter table emp parallel 8;
이처럼 차수를 지정하여 사용하는 것을 '수동 DOP' 옵티마이저가(쿼리 코디네이터라고 하기도 함) 자동으로 병렬 처리를 지정하는 것을 '자동 DOP'라고 합니다.

3. parallel_degree_limit : 자동 DOP가 사용 중일 때 한계 병렬처리 차수를 설정합니다.
(기본 값은 cpu 수 x cpu당 병렬처리 쓰레드 수 x 활성 인스턴스 수)
4. parallel_degree_policy : 자동 DOP 사용 여부를 제어, 병렬처리 큐 및 메모리 내 병렬처리에 사용됩니다. 기본적으로 비활성화 되어있습니다.
5. parallel_execution_message_size : 병렬 처리 시 서버와 쓰레드, 옵티마이저간에 사용되는 버퍼사이즈. 공유 메모리풀에서 할당됨.
6. parallel_force_local : 병렬 실행을 현재 인스턴스로 제한합니다.
7. parallel_max_servers : 한 인스턴스의 병렬처리와 병렬 복구처리에 사용될 최대 병렬 차수를 지정합니다. (인스턴스 시작 시점에 발생하는 미확정 트랜잭션의 처리와 복구에 관련된 변수인 것 같습니다.) 인스턴스 시작 시 해당 수치까지 프로세스를 증가시킵니다.
이 변수를 너무 작게 설정할 경우 일부 쿼리에서 병렬처리를 수행할 수 없게 됩니다.
8. parallel_min_servers : 오라클 데이터베이스 시작을 위해 시작 시와 병렬처리 제한에 사용될 병렬처리 프로세스 수를 지정합니다. 이러한 세팅은 병렬처리의 시작 비용을 균형적으로 만들지만 병렬 처리가 종료된 이후에도 메모리와 병렬 프로세스를 반환하지 않습니다. 데이터베이스 셧다운시 반환됩니다.
9. parallel_min_percent : 병렬 실행에 허용된 최소 쓰레드 비율, 기본 값은 0이며 병렬처리 에 사용할 수 있는 프로세스가 없는 경우 직렬 처리됨.
10. parallel_servers_target : 병렬처리가 사용되기 전 서버에서 병렬 처리 대기열이 사용할 수 있는 프로세스 수.  병렬 처리 대기열은 PARALLEL_DEGREE_POLICY가 AUTO로 설정되었을 경우 사용 가능합니다.
11. parallel_threads_per_cpu : CPU당 병렬처리에 사용가능한 쓰레드 수.

3. 병렬처리 사용 시 주의사항

ORACLE API 문서와 병렬처리 환경설정은 대게 사용자가 직접 작성한 힌트를 통한 SQL을 처리를 하는 것 보다는 자동 처리에 초점이 맞춰진 것 같습니다. 하지만 실무 환경에서 이런 옵션 설정만으로 병렬 처리를 수행 할 경우 위험할 수 있습니다.

예를 들자면 동시접속자가 800명인 시스템에서 자동 병렬처리를 16개씩 자동으로 사용하게 했다고 칩시다. 물론 동시에 조회 버튼을 땅~ 하고 칠 리는 없지만 그래도 동시에 조회버튼을 눌렀다고 상정 할 경우 사용해야하는 쓰레드의 수는 12800개입니다. 당연히 시스템에 과부하가 생길 것 입니다. 그래서 OLTP환경에서는 대게 사용하지 않습니다. 또한 동시접속자가 많을 것으로 예상되는 데이터베이스의 경우 병렬모드를 사용하지 않게 설정되었을 수 있습니다. 가장 최근 확인한 것은 v$parameter 테이블을 조회했을 경우 parallel_dgree_limit 을 변수가 존재하지 않는 환경을 확인했습니다.

DBA에게 병렬처리 수준에 대한 질의를 해야 하는 것도 이런 부분입니다. 데이터베이스 관리 시스템을 도입하는 시점에서 적정 성능을 보장받기 위해 여러가지 성능에 대한 테스트를 하는데 이 때, 병행 제어 수준에 대한 테스트도 수행하기에 해당 문서를 갖고 있다면 현행 DBMS의 사용자 수와 그에 맞는 병행 제어 수준을 도출할 수 있기 때문입니다.

OLTP 환경의 조회 SQL에서 수행이 느릴 경우 대책으로 가장 많이 사용되는 것은 PARALLEL이 아닌 SQL에 대한 제약조건 강화를 하는 것이 보편적이고, DELETE 속도가 느릴 경우 역시 PARALLEL처리보다는 해당 데이터 영역을 파티션 테이블로 만들어 파티션에 대한 TRUNCATE를 수행하는 경우가 더 많습니다.
(물론 TRUNCATE 역시 복구 불가에 대한 공포증으로 사용하지 않는 사람도 많습니다.)

병렬 처리는 만능이 아니며 부주의하게 사용할 경우 문제가 발생하기에 아예 기피하는 사람도 있습니다.

개인적으로는 파라미터 설정 중 parallel_max_server 값의 2.5%정도 수준에서 시작하여 5%내에서 사용하고 있습니다. 통상적으로 동시접속자 30인 이내의 데이터 가공 및 집계처리 환경의 DBMS를 사용하고 있고, 일과시간에 작업하는 프로그램이 대부분이기 때문입니다. 하지만 일과 외 사긴에 사용될 경우에는 병렬처리 수준을 더 높이기도 합니다. 정확한 기준이 없는 경우가 많기 때문이기도 합니다.

4. parallel 명령 사용 예 

가. DML 사용 예

1) SELECT , DELETE, INSERT, UPDATE, MERGE 명령의 바로 뒤에
   SELECT /*+PARALLEL(16)*/ * FROM 테이블;
   DELETE /*+PARALLEL(16)*/  FROM 테이블;

2) JOIN 시에는 테이블 마다(그렇지 않을 경우 테이블 수 X PARALLEL 개수만큼의 병렬 처리가 생성됨.
  SELECT /*+PARALLEL(A 16)(B 16)*/  * FROM 테이블 A , 테이블 B  WHERE A.ID = B.ID;

※ DBMS 설정에 따라 병렬처리 사용을 세션단위로 허용해야 하는 경우 아래 명령을 사용
ALTER SESSION ENABLE PARALLEL DML;

3) 병렬 처리와 일반처리의 실행계획 차이
SELECT * FROM TABLE 플랜 (병렬 힌트를 줬으나 미 적용시에도 동일)
-----------------------------------------------------------------------------------------
 SELECT STATMENT ALL_ROWS
    Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
      PARTITION LIST ALL
        Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
          Partition #: Partition accessed #1 - #7 
            TABLE ACCESS FULL TABLE...
             Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
                Partition #: Partition accessed #1 - #7
--------------------------------------------------------------------------------------------------------------

SELECT /*+PARALLEL(16)*/ FROM TABLE (병렬힌트가 적용되었을경우) 
병렬처리 코디네이터가 실행계획에 등장하게 됩니다. 
--------------------------------------------------------------------------------------------------------------
SELECT STATMENT ALL_ROWS
    Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
      PX COORDINATOR
          PX SEND QC(RANDOM) PARALLEL_TO_SERIAL_SYS.:TQ1000:Q1000
            Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
                   PX BLOCK ITERATOR PARALLEL_COMBINED_WITH_CHILD.:TQ1000:Q1000
                   Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
                        Partition #: Partition accessed #1 - #7 
                      TABLE ACCESS FULL TABLE...
                         Cost : 332,704 Bytes:8,402,920,050 Cardinality : 52,192,050
                               Partition #: Partition accessed #1 - #7
--------------------------------------------------------------------------------------------------------------

나. DDL 사용 예

테이블의 정의나 인덱스 정의에 병렬처리 수준을 지정할 경우 DML문을 사용할때 병렬처리 수준을 선언하지 않아도 자동적용이 됩니다. 조인 처리가 될 경우의 병렬 수준에 대한 고려는 직접 해본적이 없기에 저도 잘 모르겠습니다. 알게되면 좀더 보완하도록 하겠습니다.
1) 테이블, 인덱스 등의 변경
ALTER TABLE 테이블명 PARALLEL 16;
ALTER INDEX 인덱스명 PARALLEL 16;

2) 테이블 인덱스 등의 생성
CREATE TABLE 테이블명 (컬럼명 컬럼타입+길이,...) PARALLEL (DEGREE 16 INSTANCES 1)
CREATE INDEX 인덱스명 ON 테이블명(컬럼명...) PARALLEL  (DEGREE 16 INSTANCES 1)

5. 기타 

실행계획의 차이에 대해서 기술해보려 했으나 PARALLEL로 생성된 테이블이나 인덱스와, 힌트를 통한 /*+PARALLEL(16)*/의 실행 계획 차이는 정말 미묘했습니다. COST가 1정도 차이가 나거나, 거의 차이가 없는 유사한 모습이 보여, 차이라고 할 수 없는 애매모호한 상황이 되었습니다.

개인적으로는 OLTP환경에서는 DML을 이용하여 PARALLEL을 사용하는 것을 금하고 있습니다. 다중 사용자의 트랜잭션에 의해 발생할 수 있는 병렬처리 락 때문입니다. 반면 동시 접속자가 적고 통계 위주의 시스템 환경이라면 PARALLEL을 적극적으로 사용하는 편입니다.

이상입니다. 수정이나 의견이 있으신 분은 댓글 달아주시면 고맙겠습니다.

연관된 문서가 있습니다.
대용량 데이터 처리 1. java 다중쓰레드 활용
대용량 데이터 처리 2. 오라클 병렬처리 parallel
대용량 데이터 처리 3. 테이블 파티셔닝
대용량 데이터 처리 4. DBMS_JOB

2019년 2월 21일 목요일

대용량 데이터 처리 1. java 다중쓰레드 활용

1. multi thread 는 어디에 씁니까? 

개인적인 용도는 이렇습니다.

데이터 처리 위주의 웹서비스를 운영하다 보면 항상 발생하는 문제가 있습니다. 데이터 처리 시간이 지연되면서 발생하는 web의 응답 지연 현상. 단순하게 select 구문을 사용한 데이터 조회라면, SQL을 수정하면 되지만, 그렇지 않을 경우가 있습니다.

바로 대용량 text, json, excel 파일을 입력하는 경우 발생하는 문제입니다. 이중 가장 처리시간을 획기적으로 줄일 수 있는 것은 EXCEL입니다. EXCEL 파일은 스트림을 열어 최대 로우수와 시트수를 확인하고 여러 쓰레드로 분할하여 처리 할 수 있기 때문입니다.

그리고 TEXT, CSV, JSON파일의 경우 파일의 사이즈를 알 수는 있지만, 최대 행수를 미리 알 수 있는 방법이 없어 단일 스레드 처리를 하게 되며 처리 시간이 더욱 증대 되는 문제가 있습니다. 하지만 WEB에서 많이 사용하는 방식이기도 합니다.

WEB서비스는 SESSION의 제한 시간을 30분으로 설정하고 있는 경우가 많습니다. 세션 시간이 30분으로 제한되어있는 서비스의 경우 웹을 통한 요청이 30분이 초과되면 해당 프로세스를 종료시키는 로직이 추가되는 경우가 많기에 대용량의 CSV, TEXT 파일의 처리에는 대게 WEB서비스에서 분리시키는 쓰레드를 사용하게 됩니다.


2.multi thread 사용 시 주의사항 

WAS 서버의 환경을 우선 알아야 합니다. 대게 2가지 정도만 알면 되는데 적용할 서버의 HEAP SPACE와 THREAD POOL의 개수입니다. 둘 중 하나가 초과될 경우 심각한 오류와 함께 서버가 셧다운 되는 현상이 발생할 수 있으므로, 서버관리자에게 문의 후 결정하는 것이 좋습니다.

프로그래밍 시에도 thread를 무한정 늘리는 코드를 만드는 것은 하지 않는 것이 좋습니다.

일반적으로 가장 많이 범하는 오류가 thread.sleep(1000); 명령을 통해 쓰레드의 종료시간을 예상하여 쓰레드의 개수를 유지하는 방법입니다. 이런 방법으로 프로그래밍 할 경우 다양한 외부 환경의 변화로 인해 처리 속도가 지연될 경우 역시 시스템의 셧다운이 발생할 수 있습니다.

3. thread에 대한 설명

그래서 간략하게 몇 가지만 이야기 하겠습니다. 

1) 개념 적 이야기 

대략적인 내용을 그림으로 그리자면 아래와 같습니다. 


메인 쓰레드에서는 동시 처리를 수행할 작업량과, 수행 작업을 담은 작은 단위의 작업을 생성합니다. 생성된 작업은 메모리(힙 스페이스)에 적재되고, 작업 대기 중인 쓰레드에 할당되어 작업 처리를 수행합니다. 위와 같은 개념적인 구조를 갖고 있기에 다중 쓰레드 처리를 잘못 할 경우 쓰레드 문제가 발생하는 것 입니다. 

2) 흔히 발생하는 오류 

발생하는 문제는 대게 아래와 같습니다.

1. java.lang.OutOfMemoryError : unable to create new native thread 
  at java.lang.Thread.start0(Native Method)
  at java.lang.Thread.start(Thread.java:597)
  at java.util.concurrent.ThreadPoolExecutor.addifUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)

2. java.lang.OutOfMemoryError : Failed to create a thread retVal -1073741830, ernrno 11
  at java.lang.Thread.startImpl(Native Method) 
  at java.lang.Thread.start(Thread.java:886)

※ 1,2는 유사한 오류입니다. 너무 많은 쓰레드가 한 번에 호출되어 더 이상 쓰레드를 만들 수 없음. (시스템 사양에 따라 다른 것 같습니다. 개인 pc는 600개까지 되며 was 시스템에서 200개 까지 확인하였습니다.) 즉. 쓰레드의 종료 처리가 되지 않아 발생하는 문제 입니다. 

3. "Thread-20" java.lang.OutOfMemoryError : java heap space 
※ 개별 쓰레드에 할당된 작업의 크기가 heap 스페이스의 크기를 초과했을 경우 발생합니다. 

위 오류들의 패턴으로 봤을 때 쓰레드의 오류는 대게 할당된 메모리 영역을 벗어났을 때 발생하는 것 같습니다. 1,2번의 경우 할당할 수 있는 쓰레드 개수를 초과할 경우 (제 개인 pc에서는 600개의 쓰레드가 할당되지만, 타 시스템 적용시 200개밖에 되지 않음), 그리고 3번은 쓰레드에 할당할 힙 메모리가 없을 경우 발생했습니다.

4. thread 코드 작성

JDK 8버전입니다.
멀티 쓰레드를 호출할 메인 클래스 생성 TestMain.java 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class TestMain {

static int maxThreadCnt = 4; //최대 쓰레드 수 
static int inPutRowCnt = 1000; //쓰레드별 입력행수
static boolean threadState = true; //쓰레드 그룹개수제한
        //쓰레드 그룹을 컨트롤 할 서비스 생성
private static ExecutorService ex = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}); 

//파일 처리를 위한 메인 쓰레드 
public static void main(String[] arg) {
String fPath = "D:/testwork/testdata/IFS_02-11-2019 20-54-22-00_timeSeries.dat"
try {
FileReader rd = new FileReader(fPath);
BufferedReader br = new BufferedReader(rd); 
String readLine
int rowcount = 0; 
ArrayList<String> list = new ArrayList<String>();
while ((readLine= br.readLine()) != null) {
rowcount++;
list.add(rowcount+","+readLine); 
if(rowcount%inPutRowCnt == 0){ 
//설정된 파일 행 수 만큼 분할하여 개별 쓰레드에 적재
Runnable r = new Testsub((ArrayList<String>) list.clone()); 
threadManage(r); 
list.clear();
}
}
//미처리 된 데이터에 대해 처리 
if(list.isEmpty()){
System.out.println("파일 처리 완료 ");
}else{
Runnable r = new Testsub((ArrayList<String>) list.clone());
r.run();

}

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

//쓰레드 관리를 위한 쓰레드 관리함수 
private static void threadManage(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
               
//활성중인 쓰레드의 개수가 최대 쓰레드 개수보다 작을 경우 EXECUTOR사용
if(t.activeCount() < maxThreadCnt && threadState){
System.out.println("Active Thread count is : "+t.activeCount());
ex.execute(t);
}else{
//최대 쓰레드 개수에 도달 시 executor에게 작업완료 후 종료명령
if(ex.isTerminated()){
//활성 쓰레드가 모두 종료되었을 경우 새 Executor 할당 후 실행
threadState = true;
ex = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}); 
ex.execute(t);
}else{
if(threadState){  //쓰레드 종료 명령이 호출되지 않은 경우만 수행
threadState = false;
System.out.println("Thread group Shutdown");
ex.shutdown();
}
t.run(); //쓰레드 종료 명령이 완료될 때까진 메인 클래스가 직접수행 
}
}
}
}

하위 생성될 보조 쓰레드 Testsub.java
import java.util.ArrayList;
import java.util.HashMap;

public class Testsub implements Runnable{

ArrayList<String> pList

public Testsub(ArrayList<String> arrayList) {
pList = arrayList//전달받은 파라미터를 할당
}

@Override
public void run() {
ArrayList dataArray = new ArrayList();
for(int i = 0; i < pList.size(); i++){
  //개별 비즈니스 로직을 탑재 
String[] strArr pList.get(i).split(","); 
HashMap<String, Object> tmpMap new HashMap<>();
tmpMap.put("0"strArr[0]);
tmpMap.put("1"strArr[1]);
tmpMap.put("2"strArr[2]);
tmpMap.put("3"strArr[3]);
tmpMap.put("4"strArr[4]);
dataArray.add(tmpMap);
}
//DB적재를 위한 코드 작성해서 사용 
System.out.println("work complete dataCnt : "dataArray.size());
}
}

수행결과 콘솔
Active Thread count is : 1
Active Thread count is : 2
Active Thread count is : 3
Thread group Shutdown
work complete dataCnt : 1000
work complete dataCnt : 1000
work complete dataCnt : 1000
work complete dataCnt : 1000
....
....
work complete dataCnt : 150
파일 처리 완료 
work complete dataCnt : 1000

※ 참고사항
TestMain.java
일단 스트림에서 바로 데이터를 처리하지 않는 이유는 단건 insert의 경우 부하가 심하고, 데이터에 대한 비즈니스 로직이 탑재 될 경우 해당 연산처리 덕에 메인 프로세스가 느려지는 것을 방지하기 위해 메인 클래스에서는 데이터를 분할하여 전송만, 보조 쓰레드에서는 데이터를 입력하는 비즈로직을 수행하는 방식을 취하고 있습니다.

ExecutorService ex 를 사용하는 이유는 수행되고 있는 쓰레드를 그룹화 하여 수행 shutdown() 명령 하나로 '작업 완료 후 종료해' 라는 명령을 간결하게 내릴 수 있습니다. JDK나 WAS시스템의 환경에 따라 쓰레드의 개수를 제어하기 편리 합니다.

shutdown()의 명령 이후에는 isTerminated() 명령을 사용하여 쓰레드의 종료를 확인해야 합니다. 서비스의 종료가 완료되지 않은 상태에서 새 서비스를 추가 할 경우 TestSub.java에서 수행되는 개별 비즈니스 로직의 수행 속도에 따라 쓰레드가 무한하게 증식할 수 있습니다. 

Runnable new Testsub((ArrayList<String>) list.clone()); 
러너블은 생성하는 하는 단계에서 파라미터를 전달할 수 있습니다. 전달할 수 있는 것은 파라미터뿐만 아니라 SPRINGFRAMEWORK에서의 SERVICE도 가능합니다. VO도 가능하겠죠? (이 부분은 SQL 사용을 위한 Service만 전달해봐서 정확히 모르겠습니다. ) 

또한 list 자체를 전달하는 것이 아닌 list.clone()으로 복제를 생성하는 이유는 가공중인 상태의 리스트를 던질 경우 바로 뒤에 오는 초기화 부분 list.clear(); 에서 초기화 되면서 리스트에 담겼던 데이터가 사라집니다. 

쓰레드 호출 시 t.setDaemon(true);를 사용한 이유는 이상한 서버의 오류 때분입니다. 자바의 쓰레드는 데몬 쓰레드 이건 아니건 종료되면 가비지 컬렉터에 의해 자원이 회수되는 것이 정상이지만, 일부 was 서버에서 사용자 쓰래드가 종료되지 않아서 할당된 쓰레드가 모두 종료되지 않는 오류가 있었습니다. 그 이후 이렇게 쓰고 있습니다.

Testsub.java
Thread를 만든다더니 Runnable을 만든 이유는 상속이나 수행 측면에서 이용이 편리해서입니다. 

dataArray 를 사용하는 이유는 대량의 데이터 입력 시oracle.jdbc.driver.OracleDriver 의 배열 처리가 한 건씩 데이터 입력 SQL을 수행하는 것 보다 수백 배 속도가 빠르기 때문 입니다. 

1건씩 insert 처리 시 접속 -> SQL전송 -> 커밋 -> 접속해제를 반복하기에 자원의 소모가 심합니다.  그리고 속도도 느리기 때문에 스프링 환경에선 array, 기타 환경에선 executeBatch를 사용하는 것을 개인적으로 선호합니다. 

프로그램 구동 영상 (뭐 볼 필요 없겠지만 첨부합니다.)

이상입니다. 수정이나 의견이 있으신 분은 댓글 달아주시면 고맙겠습니다.

연관된 문서가 있습니다.
대용량 데이터 처리 1. java 다중쓰레드 활용
대용량 데이터 처리 2. 오라클 병렬처리 parallel
대용량 데이터 처리 3. 테이블 파티셔닝
대용량 데이터 처리 4. DBMS_JOB