일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- JS
- MSSQL
- AWS
- 엘라스틱서치
- Git
- 영어
- 구글
- 유니티
- Ai
- unity
- mariadb
- MySQL
- error
- ChatGPT
- logstash
- s3
- 설정
- JavaScript
- build
- Python
- docker
- sample
- elasticsearch
- Windows
- nodejs
- Kibana
- Linux
- API
- ssh
- Today
- Total
가끔 보자, 하늘.
logstash tcp로 데이터 전송 시 손실없는 데이터 전달 흐름 설계 본문
비지니스 로직에서 logstash로 tcp를 바로 데이터를 전송한다면 아래와 같은 흐름으로 설계가 될 것입니다.
이 경우 logstash 혹은 elasticsearch에 문제가 있을 경우 바로 데이터 손실이 발생할 수 있습니다.
logstash와 elasticsearch의 문제 발생 혹은 점검 중 데이터 손실이 발생하지 않도록 전송할 데이터를 일시적으로 혹은 일정 기간 동안 보관했다가 logstash와 elasticsearch가 정상화 되었을 때 다시 전달할 수 있도록 아래와 같이 만들어 보겠습니다.
임시 저장소로는 Redis를 사용하고, 데이터 전송 흐름을 제어하기 위해 nodejs로 간단한 agent를 만들어 사용해 보겠습니다. 전달받은 로그는 모두 JSON 포멧의 텍스트로 가정합니다.
const http = require('http');
const redis = require('redis');
const Logstash = require('logstash-client');
const schedule = require('node-schedule');
const JSON = require('JSON');
const express = require('express');
const app = express();
const bodyParser = require('body-parser');
const cors = require('cors');
app.use(cors());
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
redis_cli = redis.createClient(6379,"127.0.0.1");
var logstash = new Logstash({
type: 'tcp',
host: "your_logstash_ip",
port: "your_logstash_port"
});
const keyList = ["logstash"];
const hostConn = [logstash];
const maxCountForReading = 100; // Don't modify to -1
var bWorking = false;
var isEmpty = function(value){
if( value == "" || value == null || value == undefined || ( value != null && typeof value == "object" && !Object.keys(value).length ) ){
return true
}else{
return false
}
};
const waitFor = (ms) => new Promise(r => setTimeout(r, ms));
async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array)
}
}
Date.prototype.yyyymmdd = function() {
return this.getUTCFullYear() +
"/" + (this.getUTCMonth() + 1) +
"/" + this.getUTCDate();
};
const server = app.listen("port_be_received_from_your_app", ()=>{
console.log(" Start agent ! ");
});
// redis에서 데이터를 읽어 logstash로 전송합니다.
function rangeFromRedis(ls, key){
return new Promise( function (res, rej){
redis_cli.lpop(key, function(err, data){
if(isEmpty(data)){
rej("null");
}else if(err){
rej(err);
}else{
try {
ls.send(JSON.parse(data)); // data는 json 포멧의 텍스트 파일로 가정합니다. 이를 json 데이터로 변환하여 logstash로 전달합니다.
}catch(e){
// record 'e' somewhere you want to write in
console.log(" ERROR : ", e );
redis_cli.lpush(key,data);
}finally{
res();
}
}
});
});
}
// 전달받은 data를 redis에 기록합니다.
async function insertToRedis(key, element, expire_days){
return new Promise((resolve, reject) => {
redis_cli.rpush(key, element, function(err, data){
if(err)
reject( err );
else{
if(expire_days !== undefined)
redis_cli.expireat(key, parseInt((+new Date/1000) + 86400*parseInt(expire_days))); // expire_days후 삭제되도록 유효기간을 설정합니다.
resolve();
}
});
});
}
app.use(function(req,res,next){
next();
});
// 기록할 데이터를 post로 your_address/put에 전달하면,
// redis에 바로 전송할 데이터를 기록하고,
app.route('/put').post(function(req,res,next){
req.accepts('application/json; charset=utf-8');
let buffer = Buffer.from(JSON.stringify(req.body));
insertToRedis( 'logstash', buffer.toString('utf8'));
var backupkey = 'logstash-' + new Date().yyyymmdd();
insertToRedis( backupkey, buffer.toString('utf8'), 7); // 여분으로 데이터가 전송된 날짜를 키로하는 공간에 7일간 추가 보관합니다.
res.json('{"result":"ok"}');
});
// 지정된 스케쥴에 upload 함수를 호출하면 redis에서 데이터를 읽어
// logstash로 전송을 시도하고 실패할 경우 다시 redis에 기록합니다.
async function upload(){
var loopIdx = 0;
await asyncForEach(keyList, async (key) => {
var bConn = hostConn[loopIdx].connected;
if(bConn == false){
console.log("failed to logstash connection!!");
await waitFor(1000);
if(++loopIdx >= keyList.length){
bWorking = false;
}
}else{
// find on redis with key
rangeFromRedis(hostConn[loopIdx], key).then( function(data){
if(++loopIdx >= keyList.length){
bWorking = false;
}
}).catch(function(err){
if(err != "null")
console.log("upload error = " + err);
if(++loopIdx >= keyList.length){
bWorking = false;
}
});
}
});
}
async function doPeriodicWork(){
await upload();
bWorking = false;
}
var j = schedule.scheduleJob('* * * * * *', function(){
if(bWorking == true){
return;
}
bWorking = true;
doPeriodicWork();
});
(* 위 코드는 샘플의 예외적인 종료 상황 및 각종 예외 상황에 대응하는 코드는 글의 길이 문제로 제외되어 있습니다.)
logstash의 pipeline은 아래와 같이 작성할 수 있습니다.
input {
tcp {
codec => "json"
port => 7020
}
}
output {
elasticsearch {
hosts => ["your_elasticsearch_ip:9200"]
index => "logstash-%{+YYYY.MM.DD}"
}
}
이제 일어날 수 있는 여러 상황에 대해서 어떻게 작동하고 후속 처리를 해야 하는지 가정해 보겠습니다.
[정상 Flow]
- 로그를 전송하는 서비스들은 agent에서 제공하는 REST API인 /put 을 이용해서 전달 후 자기 할 일을 하면 됩니다.
- agent는 매 초마다 지정된 키에 기록된 데이터를 한 개 가져와 Logstash로 전송합니다.
(한번에 여러 개의 데이터를 전송하려면 spop을 사용하시면 됩니다.)
[예외 상황]
1. logstash가 다운되거나 접속할 수 없을 경우
>> agent가 Redis에서 pop한 데이터를 logstash로 전송을 시도할 때 에러가 발생.
pop한 데이터를 다시 left push로 다음에 먼저 가져올 수 있게 삽입한다.
이를 logstash가 정상화 될 때까지 반복한다.
2. Elasticsearch가 다운되거나 접속할 수 없을 경우
>> logstash는 (데이터가 전달되는)이벤트가 발생할 경우 input, filter, output 플러그인을 거쳐 최종 output 과정이 완료 처리되어야 해당 이벤트가 완료되었다고 처리하고, 이 과정 중 문제가 발생하면 해당 이벤트를 메모리에 그대로 유지합니다. logstash 5.1 버전부터는 Persistant Queue(이하 PQ, 예외 상황을 대비하여 디스크에 이벤트를 기록)를 제공합니다.
>> logstash의 이런 과정에도 불구하고 데이터 분실이 발생한 경우, 위 샘플 코드에서는 일자를 기준으로 인덱스를 생성하도록 되어 있으며, Redis에 7일간 해당 데이터를 기록하고 있습니다. 특정 일에 문제가 발생한 경우 Elasticsearch에서 특정 일의 데이터를 지우고 'logstash-yyyy/mm/dd' 로 기록된 key를 'logstash'키로 복사하여 다시 데이터를 전송하여 복구할 수 있습니다.
3. Elasticsearch, Logstash 버전 업그레이드가 필요한 경우
>> logstash를 잠시 종료해두시면 '예외 상황 1'의 상황이 되어 Redis에 데이터가 쌓이게 됩니다. 업그레이드 후 logstash가 정상화되면 데이터를 모두 장성적으로 전송됩니다.
4. agent가 다운된다면..
>> agent는 로그를 받아 기록하고 주기적으로 전송하는 단순한 기능이라 거의 문제가 발생하지 않습니다. 만약 걱정된다면 예외상황에 대한 처리 및 로깅을 강화화고 pm2 를 사용하여 다운 될 경우 바로 재시작 되도록 해주시기 바랍니다.
이상입니다. Redis와 NodeJS로 작성된 코드가 작동되어야 하지만 데이터 손실 여부를 덜어줄 수 있으니 꼭 활용해 보시기 바랍니다. :)
'개발 이야기 > DB, 데이터분석, AI' 카테고리의 다른 글
MSSQL 버전 별 암호화 지원 정리 (0) | 2020.01.30 |
---|---|
MSSQL Linked Server 설정 방법 (0) | 2020.01.27 |
Collo - slideshare 링크 공유 (0) | 2019.11.05 |
Collo - 실시간 마이그레이션 툴 (0) | 2019.09.30 |
MariaDB, Galera Cluster, MaxScale 전체 정리 (0) | 2019.07.04 |