Read and Write Files
read
df = sql_ctx.read \
    .option('header', 'true') \ # 是否有header(即列名)
    .option('inferschema', 'true') \ # 自动推断schema, 据我观察 这个操作会读两次文件
    .option('delimiter", ',') \
    .option('quote', '\"') \
    .csv('/tmp/csv_test')
write
df.write.mode('overwrite') \
    .option('header', 'true') \ # 是否有header(即列名)
    .option('ignoreLeadingWhiteSpace',False) \ # 是否去掉前的空白字符
    .option('ignoreTrailingWhiteSpace',False) \ # 是否去掉后面的空白字符
    .csv('/tmp/csv_test')
Different Data Source
redshift
SPARK_REDSHIFT = (
    'jdbc:redshift://<url>'
    ':<port>/<schema>?user=<user>&password=<password>'
)
S3_TMP_DIR = '<s3_url>'
# sql query
sql = 'select * from <table> limit 10'
df = sql_ctx.read.format('com.databricks.spark.redshift') \
    .option('url', SPARK_REDSHIFT) \
    .option('query', sql) \
    .option('tempdir', S3_TMP_DIR) \
    .load()
# get data from table
df = sql_ctx.read.format('com.databricks.spark.redshift') \
    .option('url', SPARK_REDSHIFT) \
    .option('dbtable', table_name) \
    .option('tempdir', S3_TMP_DIR) \
    .load()
Custom Pipeline
https://stackoverflow.com/questions/37270446/how-to-roll-a-custom-estimator-in-pyspark-mllib
from pyspark.ml.pipeline import Estimator, Transformer
from pyspark.ml.param.shared import *
from pyspark.sql import functions as F
class HasK(Params):
    
    k = Param(Params._dummy(), 'k', 'k', TypeConverters.toInt)
    
    def __init__(self):
        super(HasK, self).__init__()
        
    def setK(self, value):
        return self._set(k=value)
    
    def getK(self):
        return self.getOrDefault(self.k)
class HasClusterCenters(Params):
    cluster_centers = Param(Params._dummy(), 'cluster_centers', 'cluster_centers', TypeConverters.toList)
    def __init__(self):
        super(HasClusterCenters, self).__init__()
    def setClusterCenters(self, value):
        return self._set(cluster_centers=value)
    def getClusterCenters(self):
        return self.getOrDefault(self.cluster_centers)
class OneClassKmeans(Estimator, HasFeaturesCol, HasPredictionCol, 
                     HasK, HasClusterCenters):
    def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
                 initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None):
        super(OneClassKmeans, self).__init__()
        self.setFeaturesCol(featuresCol)
        self.setPredictionCol(predictionCol)
        self.setK(k)
    
    def _fit(self, dataset):
        x = self.getFeaturesCol()
        y = self.getPredictionCol()
        
        dataset.cache()
        dataset = dataset.withColumn(y, F.lit(0))
        
        udf_to_list = F.udf(lambda x: x.toArray().tolist(), ArrayType(FloatType()))
        dataset = dataset.withColumn(x, udf_to_list(x))
        
        # 计算中心
        n = len(dataset.select(x).first()[0])
        cc = dataset.groupBy(y) \
            .agg(F.array(*[F.avg(F.col(x)[i]) for i in range(n)]).alias('averages')) \
            .first()['averages']
        self.setClusterCenters(cc)
        
        dataset.unpersist()
        
        return (OneClassKmeansModel()
            .setFeaturesCol(self.getFeaturesCol())
            .setPredictionCol(self.getPredictionCol())
            .setK(self.getK())
            .setClusterCenters(self.getClusterCenters()))
class OneClassKmeansModel(Transformer, HasFeaturesCol, HasPredictionCol,
                          HasK, HasClusterCenters):
    def _transform(self, dataset):
        x = self.getFeaturesCol()
        y = self.getPredictionCol()
        return dataset.withColumn(y, F.lit(0))
    
    def clusterCenters(self):
        return self.getClusterCenters()
    
kmeans = OneClassKmeans(k=1, featuresCol='standardized_user_features', predictionCol='cluster_prediction')
kmeans
设置日志等级
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )