- To process this I have used AWS Lambda with a custom Panda layer to Transform Text files stored in S3 buckets (input).
- When a text file is uploaded to an input bucket, it triggers the Lambda function, which processes the file and outputs a CSV to a different bucket (output bucket).
# Created a Dockerfile to build the layer in the right environment
- As lambda functions have limited built in libraries, so we need to create a custom layer containing pandas.
vi Dockerfile
FROM public.ecr.aws/lambda/python:3.12
RUN pip install pandas -t /opt/python/
RUN cd /opt && zip -r pandas-layer.zip python/
# Build and extract the layer
docker build -t pandas-layer-builder .
docker run –rm -v $(pwd):/output pandas-layer-builder cp /opt/pandas-layer.zip /output/
# Uploaded the layer
aws lambda publish-layer-version \
–layer-name pandas-layer \
–description “Pandas library for Lambda” \
–zip-file fileb://pandas-layer.zip \
–compatible-runtimes python3.12
## Step 2: Creating Lambda Function Code
# To create function directory
mkdir lambda-function
cd lambda-function
### 2.2 Created the Lambda Function
# lambda_function.py
import json
import boto3
import pandas as pd
from io import StringIO
import urllib.parse
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
Lambda function to process files from S3 triggered by S3 events
s3_client = boto3.client(‘s3’)
try:
# Process each record in the event
for record in event[‘Records’]:
# Get bucket and object key from S3 event
bucket = record[‘s3’][‘bucket’][‘name’]
key = urllib.parse.unquote_plus(record[‘s3’][‘object’][‘key’])
logger.info(f”Processing file: s3://{bucket}/{key}”)
# Only process .txt files
if not key.lower().endswith(‘.txt’):
logger.info(f”Skipping non-txt file: {key}”)
continue
# Read file from S3
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
file_content = response[‘Body’].read().decode(‘utf-8’)
except Exception as e:
logger.error(f”Error reading file {key}: {str(e)}”)
continue
# Parse the file content
env_dict, services_set = parse_file_content(file_content)
if not env_dict:
logger.warning(f”No environments found in {key}”)
continue
# Create DataFrame
df = pd.DataFrame.from_dict(env_dict)
# Generate output filename
from datetime import datetime
timestamp = datetime.now().strftime(‘%Y%m%d_%H%M%S’)
base_name = key.replace(‘.txt’, ”).split(‘/’)[-1]
output_bucket = bucket.replace(‘-input-‘, ‘-output-‘) # Assuming naming convention
output_key = f”processed/{base_name}_output_{timestamp}.csv”
# Convert to CSV and upload
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=True)
csv_content = csv_buffer.getvalue()
s3_client.put_object(
Bucket=output_bucket,
Key=output_key,
Body=csv_content,
ContentType=’text/csv’,
Metadata={
‘source-file’: key,
‘source-bucket’: bucket,
‘environments-count’: str(len(env_dict)),
‘services-count’: str(len(services_set)),
‘processed-timestamp’: timestamp
}
)
logger.info(f”Successfully processed {key} -> s3://{output_bucket}/{output_key}”)
return {
‘statusCode’: 200,
‘body’: json.dumps({
‘message’: ‘Files processed successfully’,
‘processed_files’: len(event[‘Records’])
})
}
except Exception as e:
logger.error(f”Error processing files: {str(e)}”)
return {
‘statusCode’: 500,
‘body’: json.dumps({
‘error’: str(e)
})
}
def parse_file_content(file_content):
“””Parse the configuration file content”””
env_dict = {}
services_set = set()
current_env = None
for line in file_content.split(‘\n’):
line = line.strip()
if not line:
continue
# Find environment lines (marked with *)
if line.startswith(‘*’) or ‘*’ in line:
current_env = line.replace(‘*’, ”).strip()
env_dict[current_env] = {}
logger.info(f”Processing environment: {current_env}”)
elif current_env and ‘:’ in line:
# Parse service:version pairs
parts = line.split(‘:’, 1)
if len(parts) == 2:
service = parts[0].strip()
version = parts[1].strip()
env_dict[current_env][service] = version
services_set.add(service)
return env_dict, services_set
### 2.3 Create Function ZIP
zip -r function.zip lambda_function.py
# Check size
ls -lh function.zip
## Step 3: Created S3 Buckets (Input and output)
# Set variables
ACCOUNT_ID=$(aws sts get-caller-identity –query Account –output text)
REGION=”us-east-1″ # Change as needed
STACK_NAME=”file-parser”
INPUT_BUCKET=”${STACK_NAME}-input-${ACCOUNT_ID}”
OUTPUT_BUCKET=”${STACK_NAME}-output-${ACCOUNT_ID}”
# Created buckets
aws s3 mb s3://${INPUT_BUCKET} –region ${REGION}
aws s3 mb s3://${OUTPUT_BUCKET} –region ${REGION}
echo “Created buckets:”
echo “Input: ${INPUT_BUCKET}”
echo “Output: ${OUTPUT_BUCKET}”
## Step 4: Create IAM Role for Lambda
# Create trust policy file
vi trust-policy.json
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Principal”: {
“Service”: “lambda.amazonaws.com“
},
“Action”: “sts:AssumeRole”
}
]
}
### 4.2 Create IAM Role
# Create the role
aws iam create-role \
–role-name lambda-file-parser-role \
–assume-role-policy-document file://trust-policy.json
# Attach basic Lambda execution policy
aws iam attach-role-policy \
–role-name lambda-file-parser-role \
–policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
### 4.3 Create Custom S3 Policy
# Create S3 policy
vi s3-policy.json
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“s3:GetObject”
],
“Resource”: “arn:aws:s3:::${INPUT_BUCKET}/*”
},
{
“Effect”: “Allow”,
“Action”: [
“s3:PutObject”,
“s3:PutObjectAcl”
],
“Resource”: “arn:aws:s3:::${OUTPUT_BUCKET}/*”
}
]
}
# Create and attach policy
aws iam create-policy \
–policy-name lambda-file-parser-s3-policy \
–policy-document file://s3-policy.json
aws iam attach-role-policy \
–role-name lambda-file-parser-role \
–policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/lambda-file-parser-s3-policy
## Step 5: Create Lambda Function
### 5.1 Get Layer ARN
# Get the layer ARN (replace with your layer version)
LAYER_ARN=$(aws lambda list-layer-versions –layer-name pandas-layer –query ‘LayerVersions[0].LayerVersionArn’ –output text)
echo “Layer ARN: ${LAYER_ARN}”
### 5.2 Create Lambda Function
# Get role ARN
ROLE_ARN=”arn:aws:iam::${ACCOUNT_ID}:role/lambda-file-parser-role”
# Create the function
aws lambda create-function \
–function-name file-parser \
–runtime python3.12 \
–role ${ROLE_ARN} \
–handler lambda_function.lambda_handler \
–zip-file fileb://function.zip \
–timeout 300 \
–memory-size 512 \
–layers ${LAYER_ARN} \
–description “S3-triggered file parser using pandas”
echo “Lambda function created successfully!”
## Step 6: Configure S3 Event Trigger
### 6.1 Add Lambda Permission for S3
# Get Lambda function ARN
FUNCTION_ARN=$(aws lambda get-function –function-name file-parser –query ‘Configuration.FunctionArn’ –output text)
# Add permission for S3 to invoke Lambda
aws lambda add-permission \
–function-name file-parser \
–principal s3.amazonaws.com \
–action lambda:InvokeFunction \
–source-arn arn:aws:s3:::${INPUT_BUCKET} \
–statement-id s3-trigger-permission
### 6.2 Create S3 Event Notification
# Create notification configuration
vi notification-config.json
{
“LambdaConfigurations”: [
{
“Id”: “file-parser-trigger”,
“LambdaFunctionArn”: “${FUNCTION_ARN}”,
“Events”: [“s3:ObjectCreated:*”],
“Filter”: {
“Key”: {
“FilterRules”: [
{
“Name”: “suffix”,
“Value”: “.txt”
}
]
}
}
}
]
}
EOF
# Apply notification configuration
aws s3api put-bucket-notification-configuration \
–bucket ${INPUT_BUCKET} \
–notification-configuration file://notification-config.json
echo “S3 trigger configured successfully!”
## Step 7: Test the Setup
### 7.1 Create Test File
# Create a sample input file
vi test-input.txt
*development
web-service:v1.0.0
api-service:v1.1.0
database:postgres-13
*staging
web-service:v1.0.1
api-service:v1.1.0
database:postgres-13
redis:v6.2
*production
web-service:v1.0.1
api-service:v1.1.0
database:postgres-13
redis:v6.2
monitoring:v2.0.0
EOF
### 7.2 Upload and Test
# Upload test file
aws s3 cp test-input.txt s3://${INPUT_BUCKET}/
# Wait a few seconds, then check output bucket
sleep 10
aws s3 ls s3://${OUTPUT_BUCKET}/processed/
# Download the result
aws s3 cp s3://${OUTPUT_BUCKET}/processed/ ./ –recursive
### 7.3 Check Lambda Logs
# View Lambda logs
aws logs describe-log-groups –log-group-name-prefix /aws/lambda/file-parser
# Get recent logs
aws logs filter-log-events \
–log-group-name /aws/lambda/file-parser \
–start-time $(date -d ‘5 minutes ago’ +%s)000
## Step 8: Monitor and Troubleshoot
### 8.1 CloudWatch Monitoring
# Check Lambda metrics
aws cloudwatch get-metric-statistics \
–namespace AWS/Lambda \
–metric-name Invocations \
–dimensions Name=FunctionName,Value=file-parser \
–statistics Sum \
–start-time $(date -d ‘1 hour ago’ –iso-8601) \
–end-time $(date –iso-8601) \
Leave a Reply