분류

2019년 2월 21일 목요일

대용량 데이터 처리 1. java 다중쓰레드 활용

1. multi thread 는 어디에 씁니까? 

개인적인 용도는 이렇습니다.

데이터 처리 위주의 웹서비스를 운영하다 보면 항상 발생하는 문제가 있습니다. 데이터 처리 시간이 지연되면서 발생하는 web의 응답 지연 현상. 단순하게 select 구문을 사용한 데이터 조회라면, SQL을 수정하면 되지만, 그렇지 않을 경우가 있습니다.

바로 대용량 text, json, excel 파일을 입력하는 경우 발생하는 문제입니다. 이중 가장 처리시간을 획기적으로 줄일 수 있는 것은 EXCEL입니다. EXCEL 파일은 스트림을 열어 최대 로우수와 시트수를 확인하고 여러 쓰레드로 분할하여 처리 할 수 있기 때문입니다.

그리고 TEXT, CSV, JSON파일의 경우 파일의 사이즈를 알 수는 있지만, 최대 행수를 미리 알 수 있는 방법이 없어 단일 스레드 처리를 하게 되며 처리 시간이 더욱 증대 되는 문제가 있습니다. 하지만 WEB에서 많이 사용하는 방식이기도 합니다.

WEB서비스는 SESSION의 제한 시간을 30분으로 설정하고 있는 경우가 많습니다. 세션 시간이 30분으로 제한되어있는 서비스의 경우 웹을 통한 요청이 30분이 초과되면 해당 프로세스를 종료시키는 로직이 추가되는 경우가 많기에 대용량의 CSV, TEXT 파일의 처리에는 대게 WEB서비스에서 분리시키는 쓰레드를 사용하게 됩니다.


2.multi thread 사용 시 주의사항 

WAS 서버의 환경을 우선 알아야 합니다. 대게 2가지 정도만 알면 되는데 적용할 서버의 HEAP SPACE와 THREAD POOL의 개수입니다. 둘 중 하나가 초과될 경우 심각한 오류와 함께 서버가 셧다운 되는 현상이 발생할 수 있으므로, 서버관리자에게 문의 후 결정하는 것이 좋습니다.

프로그래밍 시에도 thread를 무한정 늘리는 코드를 만드는 것은 하지 않는 것이 좋습니다.

일반적으로 가장 많이 범하는 오류가 thread.sleep(1000); 명령을 통해 쓰레드의 종료시간을 예상하여 쓰레드의 개수를 유지하는 방법입니다. 이런 방법으로 프로그래밍 할 경우 다양한 외부 환경의 변화로 인해 처리 속도가 지연될 경우 역시 시스템의 셧다운이 발생할 수 있습니다.

3. thread에 대한 설명

그래서 간략하게 몇 가지만 이야기 하겠습니다. 

1) 개념 적 이야기 

대략적인 내용을 그림으로 그리자면 아래와 같습니다. 


메인 쓰레드에서는 동시 처리를 수행할 작업량과, 수행 작업을 담은 작은 단위의 작업을 생성합니다. 생성된 작업은 메모리(힙 스페이스)에 적재되고, 작업 대기 중인 쓰레드에 할당되어 작업 처리를 수행합니다. 위와 같은 개념적인 구조를 갖고 있기에 다중 쓰레드 처리를 잘못 할 경우 쓰레드 문제가 발생하는 것 입니다. 

2) 흔히 발생하는 오류 

발생하는 문제는 대게 아래와 같습니다.

1. java.lang.OutOfMemoryError : unable to create new native thread 
  at java.lang.Thread.start0(Native Method)
  at java.lang.Thread.start(Thread.java:597)
  at java.util.concurrent.ThreadPoolExecutor.addifUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)

2. java.lang.OutOfMemoryError : Failed to create a thread retVal -1073741830, ernrno 11
  at java.lang.Thread.startImpl(Native Method) 
  at java.lang.Thread.start(Thread.java:886)

※ 1,2는 유사한 오류입니다. 너무 많은 쓰레드가 한 번에 호출되어 더 이상 쓰레드를 만들 수 없음. (시스템 사양에 따라 다른 것 같습니다. 개인 pc는 600개까지 되며 was 시스템에서 200개 까지 확인하였습니다.) 즉. 쓰레드의 종료 처리가 되지 않아 발생하는 문제 입니다. 

3. "Thread-20" java.lang.OutOfMemoryError : java heap space 
※ 개별 쓰레드에 할당된 작업의 크기가 heap 스페이스의 크기를 초과했을 경우 발생합니다. 

위 오류들의 패턴으로 봤을 때 쓰레드의 오류는 대게 할당된 메모리 영역을 벗어났을 때 발생하는 것 같습니다. 1,2번의 경우 할당할 수 있는 쓰레드 개수를 초과할 경우 (제 개인 pc에서는 600개의 쓰레드가 할당되지만, 타 시스템 적용시 200개밖에 되지 않음), 그리고 3번은 쓰레드에 할당할 힙 메모리가 없을 경우 발생했습니다.

4. thread 코드 작성

JDK 8버전입니다.
멀티 쓰레드를 호출할 메인 클래스 생성 TestMain.java 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class TestMain {

static int maxThreadCnt = 4; //최대 쓰레드 수 
static int inPutRowCnt = 1000; //쓰레드별 입력행수
static boolean threadState = true; //쓰레드 그룹개수제한
        //쓰레드 그룹을 컨트롤 할 서비스 생성
private static ExecutorService ex = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}); 

//파일 처리를 위한 메인 쓰레드 
public static void main(String[] arg) {
String fPath = "D:/testwork/testdata/IFS_02-11-2019 20-54-22-00_timeSeries.dat"
try {
FileReader rd = new FileReader(fPath);
BufferedReader br = new BufferedReader(rd); 
String readLine
int rowcount = 0; 
ArrayList<String> list = new ArrayList<String>();
while ((readLine= br.readLine()) != null) {
rowcount++;
list.add(rowcount+","+readLine); 
if(rowcount%inPutRowCnt == 0){ 
//설정된 파일 행 수 만큼 분할하여 개별 쓰레드에 적재
Runnable r = new Testsub((ArrayList<String>) list.clone()); 
threadManage(r); 
list.clear();
}
}
//미처리 된 데이터에 대해 처리 
if(list.isEmpty()){
System.out.println("파일 처리 완료 ");
}else{
Runnable r = new Testsub((ArrayList<String>) list.clone());
r.run();

}

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

//쓰레드 관리를 위한 쓰레드 관리함수 
private static void threadManage(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
               
//활성중인 쓰레드의 개수가 최대 쓰레드 개수보다 작을 경우 EXECUTOR사용
if(t.activeCount() < maxThreadCnt && threadState){
System.out.println("Active Thread count is : "+t.activeCount());
ex.execute(t);
}else{
//최대 쓰레드 개수에 도달 시 executor에게 작업완료 후 종료명령
if(ex.isTerminated()){
//활성 쓰레드가 모두 종료되었을 경우 새 Executor 할당 후 실행
threadState = true;
ex = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}); 
ex.execute(t);
}else{
if(threadState){  //쓰레드 종료 명령이 호출되지 않은 경우만 수행
threadState = false;
System.out.println("Thread group Shutdown");
ex.shutdown();
}
t.run(); //쓰레드 종료 명령이 완료될 때까진 메인 클래스가 직접수행 
}
}
}
}

하위 생성될 보조 쓰레드 Testsub.java
import java.util.ArrayList;
import java.util.HashMap;

public class Testsub implements Runnable{

ArrayList<String> pList

public Testsub(ArrayList<String> arrayList) {
pList = arrayList//전달받은 파라미터를 할당
}

@Override
public void run() {
ArrayList dataArray = new ArrayList();
for(int i = 0; i < pList.size(); i++){
  //개별 비즈니스 로직을 탑재 
String[] strArr pList.get(i).split(","); 
HashMap<String, Object> tmpMap new HashMap<>();
tmpMap.put("0"strArr[0]);
tmpMap.put("1"strArr[1]);
tmpMap.put("2"strArr[2]);
tmpMap.put("3"strArr[3]);
tmpMap.put("4"strArr[4]);
dataArray.add(tmpMap);
}
//DB적재를 위한 코드 작성해서 사용 
System.out.println("work complete dataCnt : "dataArray.size());
}
}

수행결과 콘솔
Active Thread count is : 1
Active Thread count is : 2
Active Thread count is : 3
Thread group Shutdown
work complete dataCnt : 1000
work complete dataCnt : 1000
work complete dataCnt : 1000
work complete dataCnt : 1000
....
....
work complete dataCnt : 150
파일 처리 완료 
work complete dataCnt : 1000

※ 참고사항
TestMain.java
일단 스트림에서 바로 데이터를 처리하지 않는 이유는 단건 insert의 경우 부하가 심하고, 데이터에 대한 비즈니스 로직이 탑재 될 경우 해당 연산처리 덕에 메인 프로세스가 느려지는 것을 방지하기 위해 메인 클래스에서는 데이터를 분할하여 전송만, 보조 쓰레드에서는 데이터를 입력하는 비즈로직을 수행하는 방식을 취하고 있습니다.

ExecutorService ex 를 사용하는 이유는 수행되고 있는 쓰레드를 그룹화 하여 수행 shutdown() 명령 하나로 '작업 완료 후 종료해' 라는 명령을 간결하게 내릴 수 있습니다. JDK나 WAS시스템의 환경에 따라 쓰레드의 개수를 제어하기 편리 합니다.

shutdown()의 명령 이후에는 isTerminated() 명령을 사용하여 쓰레드의 종료를 확인해야 합니다. 서비스의 종료가 완료되지 않은 상태에서 새 서비스를 추가 할 경우 TestSub.java에서 수행되는 개별 비즈니스 로직의 수행 속도에 따라 쓰레드가 무한하게 증식할 수 있습니다. 

Runnable new Testsub((ArrayList<String>) list.clone()); 
러너블은 생성하는 하는 단계에서 파라미터를 전달할 수 있습니다. 전달할 수 있는 것은 파라미터뿐만 아니라 SPRINGFRAMEWORK에서의 SERVICE도 가능합니다. VO도 가능하겠죠? (이 부분은 SQL 사용을 위한 Service만 전달해봐서 정확히 모르겠습니다. ) 

또한 list 자체를 전달하는 것이 아닌 list.clone()으로 복제를 생성하는 이유는 가공중인 상태의 리스트를 던질 경우 바로 뒤에 오는 초기화 부분 list.clear(); 에서 초기화 되면서 리스트에 담겼던 데이터가 사라집니다. 

쓰레드 호출 시 t.setDaemon(true);를 사용한 이유는 이상한 서버의 오류 때분입니다. 자바의 쓰레드는 데몬 쓰레드 이건 아니건 종료되면 가비지 컬렉터에 의해 자원이 회수되는 것이 정상이지만, 일부 was 서버에서 사용자 쓰래드가 종료되지 않아서 할당된 쓰레드가 모두 종료되지 않는 오류가 있었습니다. 그 이후 이렇게 쓰고 있습니다.

Testsub.java
Thread를 만든다더니 Runnable을 만든 이유는 상속이나 수행 측면에서 이용이 편리해서입니다. 

dataArray 를 사용하는 이유는 대량의 데이터 입력 시oracle.jdbc.driver.OracleDriver 의 배열 처리가 한 건씩 데이터 입력 SQL을 수행하는 것 보다 수백 배 속도가 빠르기 때문 입니다. 

1건씩 insert 처리 시 접속 -> SQL전송 -> 커밋 -> 접속해제를 반복하기에 자원의 소모가 심합니다.  그리고 속도도 느리기 때문에 스프링 환경에선 array, 기타 환경에선 executeBatch를 사용하는 것을 개인적으로 선호합니다. 

프로그램 구동 영상 (뭐 볼 필요 없겠지만 첨부합니다.)

이상입니다. 수정이나 의견이 있으신 분은 댓글 달아주시면 고맙겠습니다.

연관된 문서가 있습니다.
대용량 데이터 처리 1. java 다중쓰레드 활용
대용량 데이터 처리 2. 오라클 병렬처리 parallel
대용량 데이터 처리 3. 테이블 파티셔닝
대용량 데이터 처리 4. DBMS_JOB

댓글 없음:

댓글 쓰기