Read deleted S3 objects in AWS Glue

Read files marked as deleted from an AWS S3 bucket into a DynamicFrame using boto3 and AWS Glue import boto3 from awsglue.context import GlueContext from pyspark.context import SparkContext from awsglue.dynamicframe import DynamicFrame def read_deleted_files_to_dynamicframe(bucket_name, prefix=''): # Initialize Glue context sc = SparkContext() glueContext = GlueContext(sc) # Initialize S3 client s3 = boto3.client('s3') # List object versions including delete markers paginator = s3.get_paginator('list_object_versions') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) # Collect keys of deleted files deleted_files = [] for page in page_iterator: if 'DeleteMarkers' in page: for delete_marker in page['DeleteMarkers']: if delete_marker['IsLatest']: deleted_files.append(delete_marker['Key']) # Read deleted files into a DynamicFrame if deleted_files: # Create a DynamicFrame from the deleted files dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [f"s3://{bucket_name}/{key}" for key in deleted_files], "recurse": True, "useS3ListImplementation": True, "readVersion": "LATEST_PREVIOUS" # Read the version before the delete marker }, format="csv", # Adjust this based on your file format format_options={ "withHeader": True, "separator": "," } ) return dyf else: print("No deleted files found.") return None # Usage bucket_name = 'your-bucket-name' prefix = 'your-folder-prefix/' # Optional dyf = read_deleted_files_to_dynamicframe(bucket_name, prefix) if dyf: # Print the schema of the DynamicFrame dyf.printSchema() # Convert to DataFrame for further processing if needed df = dyf.toDF() # Perform your manipulations here # Convert back to DynamicFrame if necessary result_dyf = DynamicFrame.fromDF(df, glueContext, "result_dyf") # Write the result back to S3 glueContext.write_dynamic_frame.from_options( frame=result_dyf, connection_type="s3", connection_options={"path": "s3://output-bucket/output-path/"}, format="parquet" )

September 30, 2024 · 2 min

Read S3 objects based on date in AWS Glue

Read files from an AWS S3 bucket based on their creation date using AWS Glue import boto3 from datetime import datetime from awsglue.context import GlueContext from pyspark.context import SparkContext # First, use boto3 to list and filter objects def list_s3_files_after_date(bucket_name, prefix='', start_date=None): s3 = boto3.client('s3') paginator = s3.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) filtered_files = [] for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: # S3 doesn't have a separate creation date, so we use LastModified if start_date is None or obj['LastModified'].replace(tzinfo=None) > start_date: filtered_files.append(obj['Key']) return filtered_files # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) # Then, use AWS Glue to create a DynamicFrame from the filtered files def read_filtered_files_to_dynamicframe(bucket_name, filtered_files): sc = SparkContext() glueContext = GlueContext(sc) if filtered_files: dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [f"s3://{bucket_name}/{key}" for key in filtered_files], "recurse": True, "useS3ListImplementation": True }, format="csv", # Adjust this based on your file format format_options={ "withHeader": True, "separator": "," } ) return dyf else: print("No files found matching the criteria.") return None # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) dyf = read_filtered_files_to_dynamicframe(bucket_name, filtered_files) if dyf: # Print the schema of the DynamicFrame dyf.printSchema() # Convert to DataFrame for further processing if needed df = dyf.toDF() # Perform your manipulations here # Write the result back to S3 if needed glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type="s3", connection_options={"path": "s3://output-bucket/output-path/"}, format="parquet" )

September 30, 2024 · 2 min