728x90
반응형

비지니스 로직에서 logstash로 tcp를 바로 데이터를 전송한다면 아래와 같은 흐름으로 설계가 될 것입니다. 

 

logstash에 tcp로 직접 데이터 전송 시

 

이 경우 logstash 혹은 elasticsearch에 문제가 있을 경우 바로 데이터 손실이 발생할 수 있습니다. 

 

logstash와 elasticsearch의 문제 발생 혹은 점검 중 데이터 손실이 발생하지 않도록 전송할 데이터를 일시적으로 혹은 일정 기간 동안 보관했다가 logstash와 elasticsearch가 정상화 되었을 때 다시 전달할 수 있도록 아래와 같이 만들어 보겠습니다. 

 

개선된 데이터 전송 흐름

 

임시 저장소로는 Redis를 사용하고, 데이터 전송 흐름을 제어하기 위해 nodejs로 간단한 agent를 만들어 사용해 보겠습니다. 전달받은 로그는 모두 JSON 포멧의 텍스트로 가정합니다.

const http = require('http'); 
const redis = require('redis'); 
const Logstash = require('logstash-client'); 
const schedule = require('node-schedule'); 
const JSON = require('JSON'); 
const express = require('express'); 
const app = express(); 
const bodyParser = require('body-parser'); 
const cors = require('cors'); 

app.use(cors()); 
app.use(bodyParser.json()); 
app.use(bodyParser.urlencoded({ extended: true })); 

redis_cli = redis.createClient(6379,"127.0.0.1"); 
var logstash = new Logstash({  
	type: 'tcp',
	host: "your_logstash_ip",  
	port: "your_logstash_port"
}); 

const keyList = ["logstash"]; 
const hostConn = [logstash]; 
const maxCountForReading = 100; // Don't modify to -1 
var bWorking = false; 
var isEmpty = function(value){ 
	if( value == "" || value == null || value == undefined || ( value != null && typeof value == "object" && !Object.keys(value).length ) ){ 
    	return true 
    }else{ 
    	return false 
    } 
}; 
const waitFor = (ms) => new Promise(r => setTimeout(r, ms)); 
async function asyncForEach(array, callback) {  
	for (let index = 0; index < array.length; index++) {  
    	await callback(array[index], index, array)  
    } 
} 
Date.prototype.yyyymmdd = function() {
      return this.getUTCFullYear()  +
      "/" +  (this.getUTCMonth() + 1) +
      "/" +  this.getUTCDate();
};

const server = app.listen("port_be_received_from_your_app", ()=>{  
	console.log(" Start agent ! "); 
});

//	redis에서 데이터를 읽어 logstash로 전송합니다.
function rangeFromRedis(ls, key){  
	return new Promise( function (res, rej){  
    	redis_cli.lpop(key, function(err, data){  
        	if(isEmpty(data)){  
            	rej("null");  
            }else if(err){  
            	rej(err);  
            }else{  
            	try {
            		ls.send(JSON.parse(data)); // data는 json 포멧의 텍스트 파일로 가정합니다. 이를 json 데이터로 변환하여 logstash로 전달합니다.
                }catch(e){
                	//	record 'e' somewhere you want to write in 
                    console.log(" ERROR : ", e );
                    redis_cli.lpush(key,data);
                }finally{
                    res();
                }
            }  
        });  
    }); 
} 

//	전달받은 data를 redis에 기록합니다.
async function insertToRedis(key, element, expire_days){  
	return new Promise((resolve, reject) => {  
    	redis_cli.rpush(key, element, function(err, data){  
        	if(err)  
            	reject( err );  
            else{  
            	if(expire_days !== undefined)  
                	redis_cli.expireat(key, parseInt((+new Date/1000) + 86400*parseInt(expire_days))); // expire_days후 삭제되도록 유효기간을 설정합니다.
                resolve();  
            }  
        });  
    }); 
} 

app.use(function(req,res,next){  
	next(); 
}); 

//	기록할 데이터를 post로 your_address/put에 전달하면,
//	redis에 바로 전송할 데이터를 기록하고, 

app.route('/put').post(function(req,res,next){  
	req.accepts('application/json; charset=utf-8');  

	let buffer = Buffer.from(JSON.stringify(req.body));  
    insertToRedis( 'logstash', buffer.toString('utf8'));  
    var backupkey = 'logstash-' + new Date().yyyymmdd();  
    insertToRedis( backupkey, buffer.toString('utf8'), 7);  //  여분으로 데이터가 전송된 날짜를 키로하는 공간에 7일간 추가 보관합니다.
    res.json('{"result":"ok"}'); 
}); 

//	지정된 스케쥴에 upload 함수를 호출하면 redis에서 데이터를 읽어 
//	logstash로 전송을 시도하고 실패할 경우 다시 redis에 기록합니다.
async function upload(){  
	var loopIdx = 0;  
    await asyncForEach(keyList, async (key) => {  
    	var bConn = hostConn[loopIdx].connected;  
        if(bConn == false){  
        	console.log("failed to logstash connection!!");  
            await waitFor(1000);  
            if(++loopIdx >= keyList.length){  
            	bWorking = false;  
            }  
        }else{  
        	// find on redis with key 
            rangeFromRedis(hostConn[loopIdx], key).then( function(data){ 
            	if(++loopIdx >= keyList.length){  
                    bWorking = false;  
                }  
            }).catch(function(err){  
            	if(err != "null")  
                	console.log("upload error = " + err);  
                if(++loopIdx >= keyList.length){  
                	bWorking = false;  
                }  
            });  
        }  
    }); 
} 
async function doPeriodicWork(){  
    await upload();  
    bWorking = false; 
} 
var j = schedule.scheduleJob('* * * * * *', function(){  
	if(bWorking == true){  
    	return;  
    }  
    bWorking = true;  
    doPeriodicWork(); 
});

(* 위 코드는 샘플의 예외적인 종료 상황 및 각종 예외 상황에 대응하는 코드는 글의 길이 문제로 제외되어 있습니다.)

 

logstash의 pipeline은 아래와 같이 작성할 수 있습니다. 

input {
	tcp {
		codec => "json"
		port => 7020
	}
}

output {
	elasticsearch {
		hosts => ["your_elasticsearch_ip:9200"]
		index => "logstash-%{+YYYY.MM.DD}"
	}
}

 

이제 일어날 수 있는 여러 상황에 대해서 어떻게 작동하고 후속 처리를 해야 하는지 가정해 보겠습니다.

 

[정상 Flow]

- 로그를 전송하는 서비스들은 agent에서 제공하는 REST API인 /put 을 이용해서 전달 후 자기 할 일을 하면 됩니다.

- agent는 매 초마다 지정된 키에 기록된 데이터를 한 개 가져와 Logstash로 전송합니다. 

(한번에 여러 개의 데이터를 전송하려면 spop을 사용하시면 됩니다.)

 

[예외 상황]

1. logstash가 다운되거나 접속할 수 없을 경우

  >> agent가 Redis에서 pop한 데이터를 logstash로 전송을 시도할 때 에러가 발생.

       pop한 데이터를 다시 left push로 다음에 먼저 가져올 수 있게 삽입한다. 

       이를 logstash가 정상화 될 때까지 반복한다. 

 

2. Elasticsearch가 다운되거나 접속할 수 없을 경우

 >> logstash는 (데이터가 전달되는)이벤트가 발생할 경우 input, filter, output 플러그인을 거쳐 최종 output 과정이 완료 처리되어야 해당 이벤트가 완료되었다고 처리하고, 이 과정 중 문제가 발생하면 해당 이벤트를 메모리에 그대로 유지합니다. logstash 5.1 버전부터는 Persistant Queue(이하 PQ, 예외 상황을 대비하여 디스크에 이벤트를 기록)를 제공합니다.

 >> logstash의 이런 과정에도 불구하고 데이터 분실이 발생한 경우, 위 샘플 코드에서는 일자를 기준으로 인덱스를 생성하도록 되어 있으며, Redis에 7일간 해당 데이터를 기록하고 있습니다. 특정 일에 문제가 발생한 경우 Elasticsearch에서 특정 일의 데이터를 지우고 'logstash-yyyy/mm/dd' 로 기록된 key를 'logstash'키로 복사하여 다시 데이터를 전송하여 복구할 수 있습니다. 

 

3. Elasticsearch, Logstash 버전 업그레이드가 필요한 경우

 >> logstash를 잠시 종료해두시면 '예외 상황 1'의 상황이 되어 Redis에 데이터가 쌓이게 됩니다. 업그레이드 후 logstash가 정상화되면 데이터를 모두 장성적으로 전송됩니다.

 

4. agent가 다운된다면.. 

 >> agent는 로그를 받아 기록하고 주기적으로 전송하는 단순한 기능이라 거의 문제가 발생하지 않습니다. 만약 걱정된다면 예외상황에 대한 처리 및 로깅을 강화화고 pm2 를 사용하여 다운 될 경우 바로 재시작 되도록 해주시기 바랍니다. 

  

 

이상입니다. Redis와 NodeJS로 작성된 코드가 작동되어야 하지만 데이터 손실 여부를 덜어줄 수 있으니 꼭 활용해 보시기 바랍니다. :)

반응형

+ Recent posts