Thanks for your help!


I'm indeed trying to use spark.databricks.delta.schema.autoMerge.enabled configuration,

I set the config using the following command

spark.conf.set("spark.databricks.delta.schema.autoMerge.enable","true")


and wrote my merge command as below:

Target_Table = DeltaTable.forPath(spark, Target_Table_path)

# Insert non existing records in the Target table, update existing records with end_date and ActiveRecord = 0

Target_Table.alias('dwh')\

.merge(

Source_Table_dataframe.alias('updates'),

'(dwh.Key == updates.Key)'

)\

.whenMatchedUpdate(set =

{

"end_date": "date_sub(current_date(), 1)",

"ActiveRecord": "0"

}

) \

.whenNotMatchedInsertAll()\

.execute()


but get an error message can not resolve column1 in INSERT clause given columns with the list of the source table in which column1 does not exist anymore. 

The error message you are getting is because you are trying to insert a column into the target table that does not exist in the source table. This is not allowed by Delta Lake, because it could corrupt the data in the target table.

To fix this, you need to make sure that the columns in the source table match the columns in the target table. If the source table has a column that does not exist in the target table, you need to either drop the column from the source table or add the column to the target table.

In your case, the source table has a column called column1 that does not exist in the target table. You can either drop the column from the source table or add the column to the target table.

If you drop the column from the source table, you will need to update your merge command to not include the column in the INSERT clause. For example:

Code snippet

Target_Table.alias('dwh')\

.merge(

Source_Table_dataframe.alias('updates'),

'(dwh.Key == updates.Key)'

)\

.whenMatchedUpdate(set =

{

"end_date": "date_sub(current_date(), 1)",

"ActiveRecord": "0"

}

) \

.whenNotMatchedInsertAll(

exclude_columns = ['column1']

)\

.execute()

 

If you add the column to the target table, you will need to specify the column in the INSERT clause. For example:

Code snippet

Target_Table.alias('dwh')\

.merge(

Source_Table_dataframe.alias('updates'),

'(dwh.Key == updates.Key)'

)\

.whenMatchedUpdate(set =

{

"end_date": "date_sub(current_date(), 1)",

"ActiveRecord": "0"

}

) \

.whenNotMatchedInsertAll(

columns = ['Key', 'column1']

)\

.execute()

 

Once you have made the necessary changes.