Add leading zeros to PySpark DataFrame

PySpark DataFrame - add leading zero if the value is a single digit between 0 and 9 from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, regexp_extract, length, lpad # Create a SparkSession spark = SparkSession.builder.appName("TupleToDataFrame").getOrCreate() # List of tuples arr = [("a", "0a1x"), ("x", "3"), ("cc", "11"), ("h", "5")] # Create a DataFrame df = spark.createDataFrame(arr, ["column1", "column2"]) # Function to add leading zero if the value is a single digit between 0 and 9, # replace non-digit values with "0000", and leave other values as is df = df.withColumn("column2", when(regexp_extract(col("column2"), "^\\d+$", 0) == "", "0000") .when((regexp_extract(col("column2"), "^\\d+$", 0) != "") & (length(col("column2")) == 1), lpad(col("column2"), 2, "0")) .otherwise(col("column2")) ) # Show the updated DataFrame df.show()

December 2, 2024 · 1 min

Read deleted S3 objects in AWS Glue

Read files marked as deleted from an AWS S3 bucket into a DynamicFrame using boto3 and AWS Glue import boto3 from awsglue.context import GlueContext from pyspark.context import SparkContext from awsglue.dynamicframe import DynamicFrame def read_deleted_files_to_dynamicframe(bucket_name, prefix=''): # Initialize Glue context sc = SparkContext() glueContext = GlueContext(sc) # Initialize S3 client s3 = boto3.client('s3') # List object versions including delete markers paginator = s3.get_paginator('list_object_versions') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) # Collect keys of deleted files deleted_files = [] for page in page_iterator: if 'DeleteMarkers' in page: for delete_marker in page['DeleteMarkers']: if delete_marker['IsLatest']: deleted_files.append(delete_marker['Key']) # Read deleted files into a DynamicFrame if deleted_files: # Create a DynamicFrame from the deleted files dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [f"s3://{bucket_name}/{key}" for key in deleted_files], "recurse": True, "useS3ListImplementation": True, "readVersion": "LATEST_PREVIOUS" # Read the version before the delete marker }, format="csv", # Adjust this based on your file format format_options={ "withHeader": True, "separator": "," } ) return dyf else: print("No deleted files found.") return None # Usage bucket_name = 'your-bucket-name' prefix = 'your-folder-prefix/' # Optional dyf = read_deleted_files_to_dynamicframe(bucket_name, prefix) if dyf: # Print the schema of the DynamicFrame dyf.printSchema() # Convert to DataFrame for further processing if needed df = dyf.toDF() # Perform your manipulations here # Convert back to DynamicFrame if necessary result_dyf = DynamicFrame.fromDF(df, glueContext, "result_dyf") # Write the result back to S3 glueContext.write_dynamic_frame.from_options( frame=result_dyf, connection_type="s3", connection_options={"path": "s3://output-bucket/output-path/"}, format="parquet" )

September 30, 2024 · 2 min

Read S3 objects based on date in AWS Glue

Read files from an AWS S3 bucket based on their creation date using AWS Glue import boto3 from datetime import datetime from awsglue.context import GlueContext from pyspark.context import SparkContext # First, use boto3 to list and filter objects def list_s3_files_after_date(bucket_name, prefix='', start_date=None): s3 = boto3.client('s3') paginator = s3.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) filtered_files = [] for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: # S3 doesn't have a separate creation date, so we use LastModified if start_date is None or obj['LastModified'].replace(tzinfo=None) > start_date: filtered_files.append(obj['Key']) return filtered_files # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) # Then, use AWS Glue to create a DynamicFrame from the filtered files def read_filtered_files_to_dynamicframe(bucket_name, filtered_files): sc = SparkContext() glueContext = GlueContext(sc) if filtered_files: dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [f"s3://{bucket_name}/{key}" for key in filtered_files], "recurse": True, "useS3ListImplementation": True }, format="csv", # Adjust this based on your file format format_options={ "withHeader": True, "separator": "," } ) return dyf else: print("No files found matching the criteria.") return None # Usage bucket_name = 'your-bucket-name' start_date = datetime(2024, 1, 1) # Files created after January 1, 2024 filtered_files = list_s3_files_after_date(bucket_name, start_date=start_date) dyf = read_filtered_files_to_dynamicframe(bucket_name, filtered_files) if dyf: # Print the schema of the DynamicFrame dyf.printSchema() # Convert to DataFrame for further processing if needed df = dyf.toDF() # Perform your manipulations here # Write the result back to S3 if needed glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type="s3", connection_options={"path": "s3://output-bucket/output-path/"}, format="parquet" )

September 30, 2024 · 2 min

pipx Setup

Install pipx on Windows Run commands in Git Bash env PIP_REQUIRE_VIRTUALENV=0 python -m pip install --user pipx python -m pipx ensurepath --force Open a new Git Bash terminal pipx --version If getting an error during the installation of packages using pipx No Python at 'C:\Users\<username>\AppData\Local\Programs\Python\PythonX\python.exe' Then remove the following folder C:\Users\<username>\.local\pipx If still having the same issue Then remove the following folder as well C:\Users\<username>\AppData\Local\pipx

May 30, 2024 · 1 min

Keras Transfer Learning

Transfer learning with VGG and Keras for image classification task # Credits: # Author: Gabriel Cassimiro # Blog post: https://towardsdatascience.com/transfer-learning-with-vgg16-and-keras-50ea161580b4 # GitHub Repo: https://github.com/gabrielcassimiro17/object-detection # import tensorflow_datasets as tfds from tensorflow.keras import layers, models from tensorflow.keras.utils import to_categorical from tensorflow.keras.callbacks import EarlyStopping ## Loading images and labels (train_ds, train_labels), (test_ds, test_labels) = tfds.load( "tf_flowers", split=["train[:70%]", "train[:30%]"], ## Train test split batch_size=-1, as_supervised=True, # Include labels ) ## Resizing images train_ds = tf.image.resize(train_ds, (150, 150)) test_ds = tf.image.resize(test_ds, (150, 150)) ## Transforming labels to correct format train_labels = to_categorical(train_labels, num_classes=5) test_labels = to_categorical(test_labels, num_classes=5) from tensorflow.keras.applications.vgg16 import VGG16 from tensorflow.keras.applications.vgg16 import preprocess_input ## Loading VGG16 model base_model = VGG16(weights="imagenet", include_top=False, input_shape=train_ds[0].shape) base_model.trainable = False ## Not trainable weights ## Preprocessing input train_ds = preprocess_input(train_ds) test_ds = preprocess_input(test_ds) flatten_layer = layers.Flatten() dense_layer_1 = layers.Dense(50, activation='relu') dense_layer_2 = layers.Dense(20, activation='relu') prediction_layer = layers.Dense(5, activation='softmax') model = models.Sequential([ base_model, flatten_layer, dense_layer_1, dense_layer_2, prediction_layer ]) model.compile( optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'], ) es = EarlyStopping(monitor='val_accuracy', mode='max', patience=5, restore_best_weights=True) model.fit(train_ds, train_labels, epochs=50, validation_split=0.2, batch_size=32, callbacks=[es]) model.evaluate(test_ds, test_labels)

April 9, 2023 · 1 min