Read files from an AWS S3 bucket based on their creation date using AWS Glue

import boto3
from datetime import datetime
from awsglue.context import GlueContext
from pyspark.context import SparkContext

# First, use boto3 to list and filter objects

def list_s3_files_after_date(bucket_name, prefix='', start_date=None):
    s3 = boto3.client('s3')
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    filtered_files = []
    for page in page_iterator:
        if 'Contents' in page:
            for obj in page['Contents']:
                # S3 doesn't have a separate creation date, so we use LastModified
                if start_date is None or obj['LastModified'].replace(tzinfo=None) > start_date:
                    filtered_files.append(obj['Key'])

    return filtered_files

# Usage
bucket_name = 'your-bucket-name'
start_date = datetime(2024, 1, 1)  # Files created after January 1, 2024
filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date)

# Then, use AWS Glue to create a DynamicFrame from the filtered files

def read_filtered_files_to_dynamicframe(bucket_name, filtered_files):
    sc = SparkContext()
    glueContext = GlueContext(sc)
    
    if filtered_files:
        dyf = glueContext.create_dynamic_frame.from_options(
            connection_type="s3",
            connection_options={
                "paths": [f"s3://{bucket_name}/{key}" for key in filtered_files],
                "recurse": True,
                "useS3ListImplementation": True
            },
            format="csv",  # Adjust this based on your file format
            format_options={
                "withHeader": True,
                "separator": ","
            }
        )
        return dyf
    else:
        print("No files found matching the criteria.")
        return None

# Usage
bucket_name = 'your-bucket-name'
start_date = datetime(2024, 1, 1)  # Files created after January 1, 2024
filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date)
dyf = read_filtered_files_to_dynamicframe(bucket_name, filtered_files)

if dyf:
    # Print the schema of the DynamicFrame
    dyf.printSchema()
    
    # Convert to DataFrame for further processing if needed
    df = dyf.toDF()
    
    # Perform your manipulations here
    
    # Write the result back to S3 if needed
    glueContext.write_dynamic_frame.from_options(
        frame=dyf,
        connection_type="s3",
        connection_options={"path": "s3://output-bucket/output-path/"},
        format="parquet"
    )