Read files from an AWS S3 bucket based on their creation date using AWS Glue
import boto3 from datetime import datetime from awsglue.context import GlueContext from pyspark.context import SparkContext # First, use boto3 to list and filter objects def list_s3_files_after_date(bucket_name, prefix='', start_date=None): s3 = boto3.client('s3') paginator = s3.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) filtered_files = [] for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: # S3 doesn't have a separate creation date, so we use LastModified if start_date is None or obj['LastModified'].replace(tzinfo=None) > start_date: filtered_files.append(obj['Key']) return filtered_files # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) # Then, use AWS Glue to create a DynamicFrame from the filtered files def read_filtered_files_to_dynamicframe(bucket_name, filtered_files): sc = SparkContext() glueContext = GlueContext(sc) if filtered_files: dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [f"s3://{bucket_name}/{key}" for key in filtered_files], "recurse": True, "useS3ListImplementation": True }, format="csv", # Adjust this based on your file format format_options={ "withHeader": True, "separator": "," } ) return dyf else: print("No files found matching the criteria.") return None # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) dyf = read_filtered_files_to_dynamicframe(bucket_name, filtered_files) if dyf: # Print the schema of the DynamicFrame dyf.printSchema() # Convert to DataFrame for further processing if needed df = dyf.toDF() # Perform your manipulations here # Write the result back to S3 if needed glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type="s3", connection_options={"path": "s3://output-bucket/output-path/"}, format="parquet" )