Funciones independientes que retornan un when
. Poseen el filtro que se aplicaran posteriormente.
def c1_total_blanco(dataframe_main, uso, when_extra):
return when((dataframe_main.ORGANIZACION_VENTAS == 'CL11') &
((dataframe_main.NIVEL_2 == 'A') | (dataframe_main.NIVEL_2 == 'M')) &
(dataframe_main.NIVEL_3 == 'REC') &
(dataframe_main.NIVEL_4 == 'BL') &
((dataframe_main.CANAL_DISTRIBUCION != '23') & (dataframe_main.CANAL_DISTRIBUCION != '13') & (dataframe_main.CANAL_DISTRIBUCION != '84')),
lit('Total blanco').cast(StringType()) if uso == 'I' else lit('CL04').cast(StringType())).otherwise(when_extra)
def c2_total_diseno(dataframe_main, uso):
return when((dataframe_main.ORGANIZACION_VENTAS == 'CL11') &
((dataframe_main.NIVEL_2 == 'A') | (dataframe_main.NIVEL_2 == 'M')) &
(dataframe_main.NIVEL_3 == 'REC') &
(dataframe_main.NIVEL_4 == 'DI') &
((dataframe_main.CANAL_DISTRIBUCION != '23') & (dataframe_main.CANAL_DISTRIBUCION != '13') & (dataframe_main.CANAL_DISTRIBUCION != '84')),
lit('Total diseño').cast(StringType()) if uso == 'I' else lit('CL03').cast(StringType()))
Función que crea las columnas INDICADOR y LLAVE_CONSOLIDADO.
def build_kpi(dataframe_dict):
dataframe_main = dataframe_dict['dataframe_rename_output']
dataframe_main = dataframe_main.withColumn('INDICADOR', c1_total_blanco(dataframe_main, 'I',
c2_total_diseno(dataframe_main, 'I')))
dataframe_main = dataframe_main.withColumn('LLAVE_CONSOLIDADO', c1_total_blanco(dataframe_main, 'LL',
c2_total_diseno(dataframe_main, 'LL')))
return { 'dataframe_with_kpi':dataframe_main }
Función que crea las columnas INDICADOR y LLAVE_CONSOLIDADO, se definen los filtros de INDICADOR junto con sus respectivos valores. Luego la LLAVE_CONSOLIDADO se va armando de acuerdo a los valores que existen en INDICADOR.
def build_kpi(dataframe_dict):
dataframe_main = dataframe_dict['dataframe_with_center_tipo_material_nivel']
dataframe_main = dataframe_main.withColumn('INDICADOR',
when((dataframe_main.ORIGEN=='ProduccionRep') & (dataframe_main.CODIGO_MATERIAL.contains('TROZOGEN') == False) & (dataframe_main.PUESTO_DE_TRABAJO == 'ASER-EWD'),'Maderas')
.when((dataframe_main.ORIGEN=='ProduccionRep') & (dataframe_main.PUESTO_DE_TRABAJO.startswith('TRE-M')),'Molduras')
.when((dataframe_main.ORIGEN=='0PROD_LINEAMDF') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDF') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
.when((dataframe_main.ORIGEN=='0PROD_LINEAMDF') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA') & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
.when((dataframe_main.ORIGEN=='0PROD_LINEAPB') & (dataframe_main.PUESTO_DE_TRABAJO == 'LINEA1') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
.when((dataframe_main.ORIGEN=='0PROD_LINEAPB') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA') & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
.when((dataframe_main.ORIGEN=='CIERRE_00_2') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDF2') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
.when((dataframe_main.ORIGEN=='CIERRE_00_2') & (dataframe_main.PUESTO_DE_TRABAJO == 'FORMATO2') & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'MDP') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')),'Tableros')
.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA') & ((dataframe_main.CLASE_MOVIMIENTO=='531') | (dataframe_main.CLASE_MOVIMIENTO=='532')),'Tableros')
.when((dataframe_main.ORIGEN=='CIERRE_00_4') & (dataframe_main.PUESTO_DE_TRABAJO == 'TERMINA') & ((dataframe_main.CLASE_MOVIMIENTO=='101') | (dataframe_main.CLASE_MOVIMIENTO=='102')) & (dataframe_main.GRUPO_ARTICULOS == 'C02'),'Tableros'))
dataframe_main = dataframe_main.withColumn('LLAVE_CONSOLIDADO', when(dataframe_main.INDICADOR == 'Maderas', 'CL18')
.when(dataframe_main.INDICADOR == 'Molduras', 'CL17')
.when(dataframe_main.INDICADOR == 'Tableros', 'CL16')
.otherwise(lit('PRD-SIN-LLAVE')).cast(StringType()))
return {'dataframe_kpi':dataframe_main}
Creamos una función donde se crean varias columnas y se pivotan otras.
def kpis_func(dataframes_dict):
df = dataframes_dict['add_correct_data_types']
#
fil1 = df.filter('codigo_cuenta== 15 and pais== "CL" and codigo_mercado==1 and grupo_producto_01 in ("SIN","SMU") and grupo_producto_03=="DES"') \\
.groupBy('CALMONTH','pais','codigo_version').sum('cantidad') \\
.withColumn('mercado',lit('local')) \\
.withColumn('producto',lit('tableros')) \\
.withColumn('descripcion',lit('desnudo')) \\
.groupBy('CALMONTH','pais','mercado','producto','descripcion').pivot('codigo_version').sum('sum(cantidad)') \\
.sort('CALMONTH')
#
fil2 = df.filter('codigo_cuenta== 15 and pais== "CL" and codigo_mercado==1 and grupo_producto_01 in ("SIN","SMU") and grupo_producto_03=="REC"') \\
.groupBy('CALMONTH','pais','codigo_version').sum('cantidad') \\
.withColumn('mercado',lit('local')) \\
.withColumn('producto',lit('tableros')) \\
.withColumn('descripcion',lit('recubiertos')) \\
.groupBy('CALMONTH','pais','mercado','producto','descripcion').pivot('codigo_version').sum('sum(cantidad)') \\
.sort('CALMONTH')
Creamos una función para crear únicamente LLAVE_CONSOLIDADO en base a los valores de otras columnas.
def add_llave_consolidado(dataframes_dict):
df = dataframes_dict['add_kpis']
df = df.withColumn(
'llave_consolidado',
when( (df.pais == 'CL') & (df.mercado == 'local') & (df.descripcion == 'desnudo'), lit('CL01') )
.when( (df.pais == 'CL') & (df.mercado == 'local') & (df.descripcion == 'recubiertos'), lit('CL02') )
)
return { 'add_llave_consolidado': df }