plain text 형태로 S3에 업로드한 자료들이 맘에 걸려서 bucket에 올린 자료들을 모두 암호화 하기로 결정했습니다. 

 

여러 복잡한 절차들이 있을 줄 알았는데 생각보다 큰 불편함없이 마무리 할 수 있어서 좋긴 한데, 뭔가 하다 만 느낌이라 걱정이 싹 가시진 않네요. 

 

기존에 자료를 업로드 하는 app과 자료를 읽는 app 모두에서 별다른 조치를 할 필요는 없었습니다. 

 

IAM에 설정된 사용자에 정책을 추가하는 것과 Bucket에 암호화 설정하기. 그리고 기존 데이터 암호화하기로 모든 작업이 끝나네요. 이 설정이 편한게 데이터 암호화를 클라이언트가 직접 하는게 아닌 server side encryption이라 별로 손 댈게 없어서 좋았습니다. put, get 시 별도 하는게 없어 암호화 되었는지 아닌지를 체감하지 애매하지만요.

 

Bucket에 기본 암호화 설정하기

 

일단 Bucket 설정에서 기본 암호화 설정을 해보겠습니다. 

 

암호화 하려는 버킷의 설정에서 기본 암호화를 선택하면 좌측과 같은 메뉴가 활성화 됩니다. 저는 AES-256을 선택하겠습니다. 

 

이 경우 기존에 업로드된 파일이 암호화 되지는 않고, 새로 업로드되는 자료들만 암호화됩니다. 

 

아래에서 기존 파일들을 암호화 하는 과정을 확인할 수 있습니다.

 

 

 

 

AES-256을 설정하면 좌측과 같은 화면을 확인할 수 있습니다. 

 

"권한" -> "버킷 정책" 중 "Action" 중 업로드 시 암호화된 혹은 암호화 하지 않은 파일에 대한 정책이 별도로 있을 경우 거부될 수 있다는 메세지가 있습니다. 기존에 설정된 내용이 없다면 무시하고 넘어가도 됩니다.

 

AES-256에 대한 공식 문서에는 다음과 같이 설명되어 있습니다. "서버측 암호화를 통해 유휴 데이터를 보호합니다. Amazon S3는 고유한 키로 각 객체를 암호화합니다. 또한 추가 보안 조치로 주기적으로 바뀌는 마스터 키를 사용하여 키 자체를 암호화합니다. Amazon S3 서버 측 암호화는 가장 강력한 블록 암호 중 하나인 256비트 고급 암호화 표준(AES-256)을 사용하여 데이터를 암호화합니다." (링크)

 

 

 

이 설정을 하지 않고 원하는 객체만 암호화 하려면 putObject 시 ServerSidEncryption옵션을 지정하여 암호화 요청을 할 수 있습니다. NodeJS의 샘플 코드로 예를 들어보겠습니다. 

    const AWS = require('aws-sdk');
        .
    .
    this.s3 = new AWS.S3();
    .
    .
	this.s3.putObject({Bucket:'your_bucket_name', Key:'your_key', 
    	Body:your_data, ServerSideEncryption: 'AES256', function(err,data){
			if(err){
				console.log('s3 put : ' + err);
			}
			else{
				console.log('Successfully uploaded!');
			}
		});
        
      .
      .

 

IAM 계정에 권한 설정 하기

 

이런 암호화 처리를 위해서는 접근하는 IAM 계정에 권한이 필요합니다. 

 

IAM에서 S3에 PUT, GET을 처리하는 계정에 연결권 정책의 Action에 aes:Decrypt와 aes:Encrypt를 추가해야 합니다. 

 

적용된 샘플은 아래와 같습니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1540362430000",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "aes:Decrypt",
                "aes:Encrypt"
            ],
            "Resource": [
                "arn:aws:s3:::your_bucket_name"
            ]
        }
    ]
}

 

버킷의 기존 파일들 암호화 하기

 

이제 버킷에 있던 기존 파일들의 암호화를 변경해 보겠습니다. 

 

해당 버킷에서 암호화할 폴더 혹은 파일을 선택 후 "작업" -> "암호화 변경"을 선택합니다. 

 

그리고 AES-256을 선택 후 저장을 선택하면 선택된 객체 모두 암호화를 진행합니다. 암호화된 객체를 다운로드하면 복호화 하여 다운로드 하기 때문에 암호화 된 상태인지 확인할 수 없습니다. 암호화된 객체를 선택하여 개요 부분을 확인하면 아래와 같이 적용된 부분을 확인할 수 있습니다.

 

(* 서버 측 암호화 적용 결과)

 

KMS를 선택하면 원하는 키를 선택할 수 있습니다만.. 키 관리하고 https로만 처리되어야 하는 여러 번거로움 때문에 사용하지는 않았습니다. 

 

이상입니다. 생각보다 간단한데다 사용하는 코드에서도 별다른 처리가 필요없어 너무 손쉽게 끝나버린 "S3 Bucket 암호화 적용하기" 였습니다. 

비지니스 로직에서 logstash로 tcp를 바로 데이터를 전송한다면 아래와 같은 흐름으로 설계가 될 것입니다. 

 

logstash에 tcp로 직접 데이터 전송 시

 

이 경우 logstash 혹은 elasticsearch에 문제가 있을 경우 바로 데이터 손실이 발생할 수 있습니다. 

 

logstash와 elasticsearch의 문제 발생 혹은 점검 중 데이터 손실이 발생하지 않도록 전송할 데이터를 일시적으로 혹은 일정 기간 동안 보관했다가 logstash와 elasticsearch가 정상화 되었을 때 다시 전달할 수 있도록 아래와 같이 만들어 보겠습니다. 

 

개선된 데이터 전송 흐름

 

임시 저장소로는 Redis를 사용하고, 데이터 전송 흐름을 제어하기 위해 nodejs로 간단한 agent를 만들어 사용해 보겠습니다. 전달받은 로그는 모두 JSON 포멧의 텍스트로 가정합니다.

const http = require('http'); 
const redis = require('redis'); 
const Logstash = require('logstash-client'); 
const schedule = require('node-schedule'); 
const JSON = require('JSON'); 
const express = require('express'); 
const app = express(); 
const bodyParser = require('body-parser'); 
const cors = require('cors'); 

app.use(cors()); 
app.use(bodyParser.json()); 
app.use(bodyParser.urlencoded({ extended: true })); 

redis_cli = redis.createClient(6379,"127.0.0.1"); 
var logstash = new Logstash({  
	type: 'tcp',
	host: "your_logstash_ip",  
	port: "your_logstash_port"
}); 

const keyList = ["logstash"]; 
const hostConn = [logstash]; 
const maxCountForReading = 100; // Don't modify to -1 
var bWorking = false; 
var isEmpty = function(value){ 
	if( value == "" || value == null || value == undefined || ( value != null && typeof value == "object" && !Object.keys(value).length ) ){ 
    	return true 
    }else{ 
    	return false 
    } 
}; 
const waitFor = (ms) => new Promise(r => setTimeout(r, ms)); 
async function asyncForEach(array, callback) {  
	for (let index = 0; index < array.length; index++) {  
    	await callback(array[index], index, array)  
    } 
} 
Date.prototype.yyyymmdd = function() {
      return this.getUTCFullYear()  +
      "/" +  (this.getUTCMonth() + 1) +
      "/" +  this.getUTCDate();
};

const server = app.listen("port_be_received_from_your_app", ()=>{  
	console.log(" Start agent ! "); 
});

//	redis에서 데이터를 읽어 logstash로 전송합니다.
function rangeFromRedis(ls, key){  
	return new Promise( function (res, rej){  
    	redis_cli.lpop(key, function(err, data){  
        	if(isEmpty(data)){  
            	rej("null");  
            }else if(err){  
            	rej(err);  
            }else{  
            	try {
            		ls.send(JSON.parse(data)); // data는 json 포멧의 텍스트 파일로 가정합니다. 이를 json 데이터로 변환하여 logstash로 전달합니다.
                }catch(e){
                	//	record 'e' somewhere you want to write in 
                    console.log(" ERROR : ", e );
                    redis_cli.lpush(key,data);
                }finally{
                    res();
                }
            }  
        });  
    }); 
} 

//	전달받은 data를 redis에 기록합니다.
async function insertToRedis(key, element, expire_days){  
	return new Promise((resolve, reject) => {  
    	redis_cli.rpush(key, element, function(err, data){  
        	if(err)  
            	reject( err );  
            else{  
            	if(expire_days !== undefined)  
                	redis_cli.expireat(key, parseInt((+new Date/1000) + 86400*parseInt(expire_days))); // expire_days후 삭제되도록 유효기간을 설정합니다.
                resolve();  
            }  
        });  
    }); 
} 

app.use(function(req,res,next){  
	next(); 
}); 

//	기록할 데이터를 post로 your_address/put에 전달하면,
//	redis에 바로 전송할 데이터를 기록하고, 

app.route('/put').post(function(req,res,next){  
	req.accepts('application/json; charset=utf-8');  

	let buffer = Buffer.from(JSON.stringify(req.body));  
    insertToRedis( 'logstash', buffer.toString('utf8'));  
    var backupkey = 'logstash-' + new Date().yyyymmdd();  
    insertToRedis( backupkey, buffer.toString('utf8'), 7);  //  여분으로 데이터가 전송된 날짜를 키로하는 공간에 7일간 추가 보관합니다.
    res.json('{"result":"ok"}'); 
}); 

//	지정된 스케쥴에 upload 함수를 호출하면 redis에서 데이터를 읽어 
//	logstash로 전송을 시도하고 실패할 경우 다시 redis에 기록합니다.
async function upload(){  
	var loopIdx = 0;  
    await asyncForEach(keyList, async (key) => {  
    	var bConn = hostConn[loopIdx].connected;  
        if(bConn == false){  
        	console.log("failed to logstash connection!!");  
            await waitFor(1000);  
            if(++loopIdx >= keyList.length){  
            	bWorking = false;  
            }  
        }else{  
        	// find on redis with key 
            rangeFromRedis(hostConn[loopIdx], key).then( function(data){ 
            	if(++loopIdx >= keyList.length){  
                    bWorking = false;  
                }  
            }).catch(function(err){  
            	if(err != "null")  
                	console.log("upload error = " + err);  
                if(++loopIdx >= keyList.length){  
                	bWorking = false;  
                }  
            });  
        }  
    }); 
} 
async function doPeriodicWork(){  
    await upload();  
    bWorking = false; 
} 
var j = schedule.scheduleJob('* * * * * *', function(){  
	if(bWorking == true){  
    	return;  
    }  
    bWorking = true;  
    doPeriodicWork(); 
});

(* 위 코드는 샘플의 예외적인 종료 상황 및 각종 예외 상황에 대응하는 코드는 글의 길이 문제로 제외되어 있습니다.)

 

logstash의 pipeline은 아래와 같이 작성할 수 있습니다. 

input {
	tcp {
		codec => "json"
		port => 7020
	}
}

output {
	elasticsearch {
		hosts => ["your_elasticsearch_ip:9200"]
		index => "logstash-%{+YYYY.MM.DD}"
	}
}

 

이제 일어날 수 있는 여러 상황에 대해서 어떻게 작동하고 후속 처리를 해야 하는지 가정해 보겠습니다.

 

[정상 Flow]

- 로그를 전송하는 서비스들은 agent에서 제공하는 REST API인 /put 을 이용해서 전달 후 자기 할 일을 하면 됩니다.

- agent는 매 초마다 지정된 키에 기록된 데이터를 한 개 가져와 Logstash로 전송합니다. 

(한번에 여러 개의 데이터를 전송하려면 spop을 사용하시면 됩니다.)

 

[예외 상황]

1. logstash가 다운되거나 접속할 수 없을 경우

  >> agent가 Redis에서 pop한 데이터를 logstash로 전송을 시도할 때 에러가 발생.

       pop한 데이터를 다시 left push로 다음에 먼저 가져올 수 있게 삽입한다. 

       이를 logstash가 정상화 될 때까지 반복한다. 

 

2. Elasticsearch가 다운되거나 접속할 수 없을 경우

 >> logstash는 (데이터가 전달되는)이벤트가 발생할 경우 input, filter, output 플러그인을 거쳐 최종 output 과정이 완료 처리되어야 해당 이벤트가 완료되었다고 처리하고, 이 과정 중 문제가 발생하면 해당 이벤트를 메모리에 그대로 유지합니다. logstash 5.1 버전부터는 Persistant Queue(이하 PQ, 예외 상황을 대비하여 디스크에 이벤트를 기록)를 제공합니다.

 >> logstash의 이런 과정에도 불구하고 데이터 분실이 발생한 경우, 위 샘플 코드에서는 일자를 기준으로 인덱스를 생성하도록 되어 있으며, Redis에 7일간 해당 데이터를 기록하고 있습니다. 특정 일에 문제가 발생한 경우 Elasticsearch에서 특정 일의 데이터를 지우고 'logstash-yyyy/mm/dd' 로 기록된 key를 'logstash'키로 복사하여 다시 데이터를 전송하여 복구할 수 있습니다. 

 

3. Elasticsearch, Logstash 버전 업그레이드가 필요한 경우

 >> logstash를 잠시 종료해두시면 '예외 상황 1'의 상황이 되어 Redis에 데이터가 쌓이게 됩니다. 업그레이드 후 logstash가 정상화되면 데이터를 모두 장성적으로 전송됩니다.

 

4. agent가 다운된다면.. 

 >> agent는 로그를 받아 기록하고 주기적으로 전송하는 단순한 기능이라 거의 문제가 발생하지 않습니다. 만약 걱정된다면 예외상황에 대한 처리 및 로깅을 강화화고 pm2 를 사용하여 다운 될 경우 바로 재시작 되도록 해주시기 바랍니다. 

  

 

이상입니다. Redis와 NodeJS로 작성된 코드가 작동되어야 하지만 데이터 손실 여부를 덜어줄 수 있으니 꼭 활용해 보시기 바랍니다. :)

+ Recent posts