hahadsg's note

Follow me on GitHub

Read and Write Files

read

df = sql_ctx.read \
    .option('header', 'true') \ # 是否有header(即列名)
    .option('inferschema', 'true') \ # 自动推断schema, 据我观察 这个操作会读两次文件
    .option('delimiter", ',') \
    .option('quote', '\"') \
    .csv('/tmp/csv_test')

write

df.write.mode('overwrite') \
    .option('header', 'true') \ # 是否有header(即列名)
    .option('ignoreLeadingWhiteSpace',False) \ # 是否去掉前的空白字符
    .option('ignoreTrailingWhiteSpace',False) \ # 是否去掉后面的空白字符
    .csv('/tmp/csv_test')

Different Data Source

redshift

SPARK_REDSHIFT = (
    'jdbc:redshift://<url>'
    ':<port>/<schema>?user=<user>&password=<password>'
)
S3_TMP_DIR = '<s3_url>'

# sql query
sql = 'select * from <table> limit 10'
df = sql_ctx.read.format('com.databricks.spark.redshift') \
    .option('url', SPARK_REDSHIFT) \
    .option('query', sql) \
    .option('tempdir', S3_TMP_DIR) \
    .load()

# get data from table
df = sql_ctx.read.format('com.databricks.spark.redshift') \
    .option('url', SPARK_REDSHIFT) \
    .option('dbtable', table_name) \
    .option('tempdir', S3_TMP_DIR) \
    .load()

Custom Pipeline

https://stackoverflow.com/questions/37270446/how-to-roll-a-custom-estimator-in-pyspark-mllib

from pyspark.ml.pipeline import Estimator, Transformer
from pyspark.ml.param.shared import *
from pyspark.sql import functions as F

class HasK(Params):
    
    k = Param(Params._dummy(), 'k', 'k', TypeConverters.toInt)
    
    def __init__(self):
        super(HasK, self).__init__()
        
    def setK(self, value):
        return self._set(k=value)
    
    def getK(self):
        return self.getOrDefault(self.k)

class HasClusterCenters(Params):

    cluster_centers = Param(Params._dummy(), 'cluster_centers', 'cluster_centers', TypeConverters.toList)

    def __init__(self):
        super(HasClusterCenters, self).__init__()

    def setClusterCenters(self, value):
        return self._set(cluster_centers=value)

    def getClusterCenters(self):
        return self.getOrDefault(self.cluster_centers)

class OneClassKmeans(Estimator, HasFeaturesCol, HasPredictionCol, 
                     HasK, HasClusterCenters):
    def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
                 initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None):
        super(OneClassKmeans, self).__init__()
        self.setFeaturesCol(featuresCol)
        self.setPredictionCol(predictionCol)
        self.setK(k)
    
    def _fit(self, dataset):
        x = self.getFeaturesCol()
        y = self.getPredictionCol()
        
        dataset.cache()
        dataset = dataset.withColumn(y, F.lit(0))
        
        udf_to_list = F.udf(lambda x: x.toArray().tolist(), ArrayType(FloatType()))
        dataset = dataset.withColumn(x, udf_to_list(x))
        
        # 计算中心
        n = len(dataset.select(x).first()[0])
        cc = dataset.groupBy(y) \
            .agg(F.array(*[F.avg(F.col(x)[i]) for i in range(n)]).alias('averages')) \
            .first()['averages']
        self.setClusterCenters(cc)
        
        dataset.unpersist()
        
        return (OneClassKmeansModel()
            .setFeaturesCol(self.getFeaturesCol())
            .setPredictionCol(self.getPredictionCol())
            .setK(self.getK())
            .setClusterCenters(self.getClusterCenters()))

class OneClassKmeansModel(Transformer, HasFeaturesCol, HasPredictionCol,
                          HasK, HasClusterCenters):
    def _transform(self, dataset):
        x = self.getFeaturesCol()
        y = self.getPredictionCol()
        return dataset.withColumn(y, F.lit(0))
    
    def clusterCenters(self):
        return self.getClusterCenters()
    
kmeans = OneClassKmeans(k=1, featuresCol='standardized_user_features', predictionCol='cluster_prediction')
kmeans

设置日志等级

logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )