Apache spark 在分区列之后触发自定义输出路径

Apache spark 在分区列之后触发自定义输出路径,apache-spark,Apache Spark,在Spark中,按列划分后的路径中是否可能有后缀 例如: 我正在将数据写入以下路径: /数据库名称/表名称/日期ID=20171009/事件类型=测试/ `dataset.write().partitionBy("event_type").save("/db_name/table_name/dateid=20171009");` 是否可以使用动态分区将其创建为以下内容? /数据库名称/表名称/日期ID=20171009/事件类型=测试/1507764830 //sample json {"ev

在Spark中,按列划分后的路径中是否可能有后缀

例如: 我正在将数据写入以下路径: /数据库名称/表名称/日期ID=20171009/事件类型=测试/

`dataset.write().partitionBy("event_type").save("/db_name/table_name/dateid=20171009");`
是否可以使用动态分区将其创建为以下内容? /数据库名称/表名称/日期ID=20171009/事件类型=测试/1507764830

//sample json
{"event_type": "type_A", "dateid":"20171009", "data":"garbage" }
{"event_type": "type_B", "dateid":"20171008", "data":"garbage" }
{"event_type": "type_A", "dateid":"20171007", "data":"garbage" }
{"event_type": "type_B", "dateid":"20171006", "data":"garbage" }

// save as partition
spark.read
  .json("./data/sample.json")
  .write
  .partitionBy("dateid", "event_type").saveAsTable("sample")

//result

阅读源代码后,可以使用
FileOutputCommitter
执行此操作

SparkSession spark = SparkSession
            .builder()
            .master("local[2]")
            .config("spark.sql.parquet.output.committer.class", "com.estudio.spark.ESParquetOutputCommitter")
            .config("spark.sql.sources.commitProtocolClass", "com.estudio.spark.ESSQLHadoopMapReduceCommitProtocol")
            .getOrCreate();

ESSQLHadoopMapReduceCommitProtocol.realAppendMode = false;

spark.range(10000)
.withColumn("type", rand()    
.multiply(6).cast("int"))
.write()
.mode(Append)
.partitionBy("type")
.format("parquet")
.save("/tmp/spark/test1/");
这里是定制的
ParquetOutputCommitter
,它是定制输出路径的地方。在本例中,我们将为时间戳添加后缀。我们必须确保它是同步的。代码如下:

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.hadoop.ParquetOutputCommitter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class ESParquetOutputCommitter extends ParquetOutputCommitter {

    private final static Map<String, Path> pathMap = new HashMap<>();

    public final static synchronized Path getNewPath(final Path path) {
        final String key = path.toString();
        log.debug("path.key: {}", key);

        if (pathMap.containsKey(key)) {
            return pathMap.get(key);
        }

        final Path newPath = new Path(path, Long.toString(System.currentTimeMillis()));
        pathMap.put(key, newPath);

        log.info("---> Path: {}, newPath: {}", path, newPath);
        return newPath;
    }

    public ESParquetOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        super(getNewPath(outputPath), context);
        log.info("this: {}", this);
    }
}
还添加了一个静态标志
realpendmode
,以关闭所有这些功能

再说一次,我还不是火花专家,请告诉我是否有火花
这个解决方案有什么问题吗

结果是
newTaskTempFile
就是这个地方。前一种方法不适用于动态分区

public String newTaskTempFile(TaskAttemptContext taskContext, Option<String> dir, String ext) {
    Option<String> dirWithTimestamp = Option.apply(dir.get() + "/" + timestamp)
    return super.newTaskTempFile(taskContext, dirWithTimestamp, ext);
}
公共字符串newTaskTempFile(TaskAttemptContext taskContext,选项dir,字符串ext){
Option dirWithTimestamp=Option.apply(dir.get()+“/”+时间戳)
返回super.newTaskTempFile(taskContext,dirWithTimestamp,ext);
}

谢谢@L.Li。我试图在分区列之后有一个时间戳,比如:
/sample/dataid=20171006/event\u type=type\u b/1507764830
目前,我是通过向数据集中添加另一列(TS),然后按(dateid,event\u type,TS)分区来完成的,输出路径类似于:
/sample/dataid=20171006/event_type=type_b/_TS_1507764830
如果直接从S3读取,“TS”将成为分区列。这是我想避免的部分。
public String newTaskTempFile(TaskAttemptContext taskContext, Option<String> dir, String ext) {
    Option<String> dirWithTimestamp = Option.apply(dir.get() + "/" + timestamp)
    return super.newTaskTempFile(taskContext, dirWithTimestamp, ext);
}