Crear llave de agrupación

Alternativa #1

Funciones independientes que retornan un when. Poseen el filtro que se aplicaran posteriormente.

def c1_total_blanco(dataframe_main, uso, when_extra):
  return when((dataframe_main.ORGANIZACION_VENTAS == 'CL11') & 
              ((dataframe_main.NIVEL_2 == 'A') | (dataframe_main.NIVEL_2 == 'M')) & 
              (dataframe_main.NIVEL_3 == 'REC') & 
              (dataframe_main.NIVEL_4 == 'BL') & 
              ((dataframe_main.CANAL_DISTRIBUCION != '23') & (dataframe_main.CANAL_DISTRIBUCION != '13') & (dataframe_main.CANAL_DISTRIBUCION != '84')), 
              lit('Total blanco').cast(StringType()) if uso == 'I' else lit('CL04').cast(StringType())).otherwise(when_extra)

def c2_total_diseno(dataframe_main, uso):
  return when((dataframe_main.ORGANIZACION_VENTAS == 'CL11') & 
              ((dataframe_main.NIVEL_2 == 'A') | (dataframe_main.NIVEL_2 == 'M')) & 
              (dataframe_main.NIVEL_3 == 'REC') & 
              (dataframe_main.NIVEL_4 == 'DI') & 
              ((dataframe_main.CANAL_DISTRIBUCION != '23') & (dataframe_main.CANAL_DISTRIBUCION != '13') & (dataframe_main.CANAL_DISTRIBUCION != '84')), 
              lit('Total diseño').cast(StringType()) if uso == 'I' else lit('CL03').cast(StringType()))

Función que crea las columnas INDICADOR y LLAVE_CONSOLIDADO.

def build_kpi(dataframe_dict):
  dataframe_main = dataframe_dict['dataframe_rename_output']
  dataframe_main = dataframe_main.withColumn('INDICADOR', c1_total_blanco(dataframe_main, 'I', 
    c2_total_diseno(dataframe_main, 'I')))
  dataframe_main = dataframe_main.withColumn('LLAVE_CONSOLIDADO', c1_total_blanco(dataframe_main, 'LL',
    c2_total_diseno(dataframe_main, 'LL')))
  return { 'dataframe_with_kpi':dataframe_main }

Alternativa #2

Función que crea las columnas INDICADOR y LLAVE_CONSOLIDADO, se definen los filtros de INDICADOR junto con sus respectivos valores. Luego la LLAVE_CONSOLIDADO se va armando de acuerdo a los valores que existen en INDICADOR.

def build_kpi(dataframe_dict):
  dataframe_main = dataframe_dict['dataframe_with_center_tipo_material_nivel']
  dataframe_main = dataframe_main.withColumn('INDICADOR', 
		when((dataframe_main.ORIGEN=='ProduccionRep') & (dataframe_main.CODIGO_MATERIAL.contains('TROZOGEN') == False) & (dataframe_main.PUESTO_DE_TRABAJO == 'ASER-EWD'),'Maderas')

		.when((dataframe_main.ORIGEN=='ProduccionRep') & (dataframe_main.PUESTO_DE_TRABAJO.startswith('TRE-M')),'Molduras')

		.when((dataframe_main.ORIGEN=='0PROD_LINEAMDF') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDF') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
		.when((dataframe_main.ORIGEN=='0PROD_LINEAMDF') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA')  & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
		.when((dataframe_main.ORIGEN=='0PROD_LINEAPB') & (dataframe_main.PUESTO_DE_TRABAJO == 'LINEA1')  & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
		.when((dataframe_main.ORIGEN=='0PROD_LINEAPB') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA')  & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
		.when((dataframe_main.ORIGEN=='CIERRE_00_2') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDF2')  & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
		.when((dataframe_main.ORIGEN=='CIERRE_00_2') & (dataframe_main.PUESTO_DE_TRABAJO == 'FORMATO2')  & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
		.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDP')  & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
		.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA')  & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
		.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA')  & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')) & (dataframe_main.GRUPO_ARTICULOS == 'C02'),'Tableros'))

	dataframe_main = dataframe_main.withColumn('LLAVE_CONSOLIDADO', when(dataframe_main.INDICADOR == 'Maderas', 'CL18')
		.when(dataframe_main.INDICADOR == 'Molduras', 'CL17')
		.when(dataframe_main.INDICADOR == 'Tableros', 'CL16')
		.otherwise(lit('PRD-SIN-LLAVE')).cast(StringType()))
  return {'dataframe_kpi':dataframe_main}

Alternativa #3

Creamos una función donde se crean varias columnas y se pivotan otras.

def kpis_func(dataframes_dict):
  df = dataframes_dict['add_correct_data_types']
  #
  fil1  = df.filter('codigo_cuenta== 15 and pais== "CL" and codigo_mercado==1 and grupo_producto_01 in ("SIN","SMU") and grupo_producto_03=="DES"') \\
            .groupBy('CALMONTH','pais','codigo_version').sum('cantidad') \\
            .withColumn('mercado',lit('local')) \\
            .withColumn('producto',lit('tableros')) \\
            .withColumn('descripcion',lit('desnudo')) \\
            .groupBy('CALMONTH','pais','mercado','producto','descripcion').pivot('codigo_version').sum('sum(cantidad)') \\
            .sort('CALMONTH')
  #
  fil2  = df.filter('codigo_cuenta== 15 and pais== "CL" and codigo_mercado==1 and grupo_producto_01 in ("SIN","SMU") and grupo_producto_03=="REC"') \\
            .groupBy('CALMONTH','pais','codigo_version').sum('cantidad') \\
            .withColumn('mercado',lit('local')) \\
            .withColumn('producto',lit('tableros')) \\
            .withColumn('descripcion',lit('recubiertos')) \\
            .groupBy('CALMONTH','pais','mercado','producto','descripcion').pivot('codigo_version').sum('sum(cantidad)') \\
            .sort('CALMONTH')

Creamos una función para crear únicamente LLAVE_CONSOLIDADO en base a los valores de otras columnas.

def add_llave_consolidado(dataframes_dict):
  df = dataframes_dict['add_kpis']

  df = df.withColumn(
    'llave_consolidado',
    when( (df.pais == 'CL') & (df.mercado == 'local') & (df.descripcion == 'desnudo'), lit('CL01') )
    .when( (df.pais == 'CL') & (df.mercado == 'local') & (df.descripcion == 'recubiertos'), lit('CL02') )
  )
  return { 'add_llave_consolidado': df }