Transformed TXT file to CSV using AWS Lambda function

  • To process this I have used AWS Lambda with a custom Panda layer to Transform Text files stored in S3 buckets (input).
  • When a text file is uploaded to an input bucket, it triggers the Lambda function, which processes the file and outputs a CSV to a different bucket (output bucket).

# Created a Dockerfile to build the layer in the right environment

  • As lambda functions have limited built in libraries, so we need to create a custom layer containing pandas.

vi Dockerfile

FROM public.ecr.aws/lambda/python:3.12

RUN pip install pandas -t /opt/python/

RUN cd /opt && zip -r pandas-layer.zip python/

# Build and extract the layer

docker build -t pandas-layer-builder .

docker run –rm -v $(pwd):/output pandas-layer-builder cp /opt/pandas-layer.zip /output/

# Uploaded the layer

aws lambda publish-layer-version \

  –layer-name pandas-layer \

  –description “Pandas library for Lambda” \

  –zip-file fileb://pandas-layer.zip \

  –compatible-runtimes python3.12

## Step 2: Creating Lambda Function Code

# To create function directory
mkdir lambda-function
cd lambda-function

### 2.2 Created the Lambda Function

# lambda_function.py
import json
import boto3
import pandas as pd
from io import StringIO
import urllib.parse
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   
    Lambda function to process files from S3 triggered by S3 events
   
    s3_client = boto3.client(‘s3’)

    try:
        # Process each record in the event
        for record in event[‘Records’]:
            # Get bucket and object key from S3 event
            bucket = record[‘s3’][‘bucket’][‘name’]
            key = urllib.parse.unquote_plus(record[‘s3’][‘object’][‘key’])

logger.info(f”Processing file: s3://{bucket}/{key}”)

            # Only process .txt files
            if not key.lower().endswith(‘.txt’):
logger.info(f”Skipping non-txt file: {key}”)
                continue

            # Read file from S3
            try:
                response = s3_client.get_object(Bucket=bucket, Key=key)
                file_content = response[‘Body’].read().decode(‘utf-8’)
            except Exception as e:
                logger.error(f”Error reading file {key}: {str(e)}”)
                continue

            # Parse the file content
            env_dict, services_set = parse_file_content(file_content)

            if not env_dict:
                logger.warning(f”No environments found in {key}”)
                continue

            # Create DataFrame
            df = pd.DataFrame.from_dict(env_dict)

            # Generate output filename
            from datetime import datetime
            timestamp = datetime.now().strftime(‘%Y%m%d_%H%M%S’)
            base_name = key.replace(‘.txt’, ”).split(‘/’)[-1]
            output_bucket = bucket.replace(‘-input-‘, ‘-output-‘)  # Assuming naming convention
            output_key = f”processed/{base_name}_output_{timestamp}.csv”

            # Convert to CSV and upload
            csv_buffer = StringIO()
            df.to_csv(csv_buffer, index=True)
            csv_content = csv_buffer.getvalue()

            s3_client.put_object(
                Bucket=output_bucket,
                Key=output_key,
                Body=csv_content,
                ContentType=’text/csv’,
                Metadata={
                    ‘source-file’: key,
                    ‘source-bucket’: bucket,
                    ‘environments-count’: str(len(env_dict)),
                    ‘services-count’: str(len(services_set)),
                    ‘processed-timestamp’: timestamp
                }
            )

logger.info(f”Successfully processed {key} -> s3://{output_bucket}/{output_key}”)

        return {
            ‘statusCode’: 200,
            ‘body’: json.dumps({
                ‘message’: ‘Files processed successfully’,
                ‘processed_files’: len(event[‘Records’])
            })
        }

    except Exception as e:
        logger.error(f”Error processing files: {str(e)}”)
        return {
            ‘statusCode’: 500,
            ‘body’: json.dumps({
                ‘error’: str(e)
            })
        }

def parse_file_content(file_content):
    “””Parse the configuration file content”””
    env_dict = {}
    services_set = set()
    current_env = None

    for line in file_content.split(‘\n’):
        line = line.strip()
        if not line:
            continue

        # Find environment lines (marked with *)
        if line.startswith(‘*’) or ‘*’ in line:
            current_env = line.replace(‘*’, ”).strip()
            env_dict[current_env] = {}
logger.info(f”Processing environment: {current_env}”)
        elif current_env and ‘:’ in line:
            # Parse service:version pairs
            parts = line.split(‘:’, 1)
            if len(parts) == 2:
                service = parts[0].strip()
                version = parts[1].strip()
                env_dict[current_env][service] = version
                services_set.add(service)

    return env_dict, services_set


### 2.3 Create Function ZIP
zip -r function.zip lambda_function.py

# Check size
ls -lh function.zip

## Step 3: Created S3 Buckets (Input and output)

# Set variables
ACCOUNT_ID=$(aws sts get-caller-identity –query Account –output text)
REGION=”us-east-1″  # Change as needed
STACK_NAME=”file-parser”

INPUT_BUCKET=”${STACK_NAME}-input-${ACCOUNT_ID}”
OUTPUT_BUCKET=”${STACK_NAME}-output-${ACCOUNT_ID}”

# Created buckets
aws s3 mb s3://${INPUT_BUCKET} –region ${REGION}
aws s3 mb s3://${OUTPUT_BUCKET} –region ${REGION}

echo “Created buckets:”
echo “Input: ${INPUT_BUCKET}”
echo “Output: ${OUTPUT_BUCKET}”

## Step 4: Create IAM Role for Lambda

# Create trust policy file
vi trust-policy.json
{
  “Version”: “2012-10-17”,
  “Statement”: [
    {
      “Effect”: “Allow”,
      “Principal”: {
        “Service”: “lambda.amazonaws.com
      },
      “Action”: “sts:AssumeRole”
    }
  ]
}

### 4.2 Create IAM Role

# Create the role
aws iam create-role \
  –role-name lambda-file-parser-role \
  –assume-role-policy-document file://trust-policy.json

# Attach basic Lambda execution policy
aws iam attach-role-policy \
  –role-name lambda-file-parser-role \
  –policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

### 4.3 Create Custom S3 Policy

# Create S3 policy
vi s3-policy.json
{
  “Version”: “2012-10-17”,
  “Statement”: [
    {
      “Effect”: “Allow”,
      “Action”: [
        “s3:GetObject”
      ],
      “Resource”: “arn:aws:s3:::${INPUT_BUCKET}/*”
    },
    {
      “Effect”: “Allow”,
      “Action”: [
        “s3:PutObject”,
        “s3:PutObjectAcl”
      ],
      “Resource”: “arn:aws:s3:::${OUTPUT_BUCKET}/*”
    }
  ]
}

# Create and attach policy
aws iam create-policy \
  –policy-name lambda-file-parser-s3-policy \
  –policy-document file://s3-policy.json

aws iam attach-role-policy \
  –role-name lambda-file-parser-role \
  –policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/lambda-file-parser-s3-policy

## Step 5: Create Lambda Function

### 5.1 Get Layer ARN

# Get the layer ARN (replace with your layer version)
LAYER_ARN=$(aws lambda list-layer-versions –layer-name pandas-layer –query ‘LayerVersions[0].LayerVersionArn’ –output text)
echo “Layer ARN: ${LAYER_ARN}”


### 5.2 Create Lambda Function

# Get role ARN
ROLE_ARN=”arn:aws:iam::${ACCOUNT_ID}:role/lambda-file-parser-role”

# Create the function
aws lambda create-function \
  –function-name file-parser \
  –runtime python3.12 \
  –role ${ROLE_ARN} \
  –handler lambda_function.lambda_handler \
  –zip-file fileb://function.zip \
  –timeout 300 \
  –memory-size 512 \
  –layers ${LAYER_ARN} \
  –description “S3-triggered file parser using pandas”

echo “Lambda function created successfully!”


## Step 6: Configure S3 Event Trigger

### 6.1 Add Lambda Permission for S3


# Get Lambda function ARN
FUNCTION_ARN=$(aws lambda get-function –function-name file-parser –query ‘Configuration.FunctionArn’ –output text)

# Add permission for S3 to invoke Lambda
aws lambda add-permission \
  –function-name file-parser \
  –principal s3.amazonaws.com \
  –action lambda:InvokeFunction \
  –source-arn arn:aws:s3:::${INPUT_BUCKET} \
  –statement-id s3-trigger-permission

### 6.2 Create S3 Event Notification

# Create notification configuration
vi notification-config.json
{
  “LambdaConfigurations”: [
    {
      “Id”: “file-parser-trigger”,
      “LambdaFunctionArn”: “${FUNCTION_ARN}”,
      “Events”: [“s3:ObjectCreated:*”],
      “Filter”: {
        “Key”: {
          “FilterRules”: [
            {
              “Name”: “suffix”,
              “Value”: “.txt”
            }
          ]
        }
      }
    }
  ]
}
EOF

# Apply notification configuration
aws s3api put-bucket-notification-configuration \
  –bucket ${INPUT_BUCKET} \
  –notification-configuration file://notification-config.json

echo “S3 trigger configured successfully!”


## Step 7: Test the Setup

### 7.1 Create Test File

# Create a sample input file
vi test-input.txt
*development
web-service:v1.0.0
api-service:v1.1.0
database:postgres-13

*staging
web-service:v1.0.1
api-service:v1.1.0
database:postgres-13
redis:v6.2

*production
web-service:v1.0.1
api-service:v1.1.0
database:postgres-13
redis:v6.2
monitoring:v2.0.0
EOF

### 7.2 Upload and Test

# Upload test file
aws s3 cp test-input.txt s3://${INPUT_BUCKET}/

# Wait a few seconds, then check output bucket
sleep 10
aws s3 ls s3://${OUTPUT_BUCKET}/processed/

# Download the result
aws s3 cp s3://${OUTPUT_BUCKET}/processed/ ./ –recursive

### 7.3 Check Lambda Logs

# View Lambda logs
aws logs describe-log-groups –log-group-name-prefix /aws/lambda/file-parser

# Get recent logs
aws logs filter-log-events \
  –log-group-name /aws/lambda/file-parser \
  –start-time $(date -d ‘5 minutes ago’ +%s)000


## Step 8: Monitor and Troubleshoot

### 8.1 CloudWatch Monitoring

# Check Lambda metrics
aws cloudwatch get-metric-statistics \
  –namespace AWS/Lambda \
  –metric-name Invocations \
  –dimensions Name=FunctionName,Value=file-parser \
  –statistics Sum \
  –start-time $(date -d ‘1 hour ago’ –iso-8601) \
  –end-time $(date –iso-8601) \

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *