Apache Spark ve Amazon S3 - Gotchas ve en iyi uygulamalar

S3 bir dosya deposu değil bir nesne deposudur, dolayısıyla nihai tutarlılıktan kaynaklanan sorunların atomik olmayan adların uygulama kodunda ele alınması gerekir. Bir dosya sistemindeki dizin sunucusu, dosya adının karma algoritmasıyla değiştirildi. Bu şeyleri listelemek, dizin işlemleri, silmek ve yeniden adlandırmak için kötü (teknik olarak kopyalamak ve silmek, nesne depolarında yeniden adlandırma yoktur)

S3A (URI şeması: s3a: //) - Hadoop 2.7+ kullanmaya başlayın. S3a, Hadoop 2.7 için önerilen S3 İstemcisidir ve daha sonra S3a daha performanslıdır ve daha büyük dosyaları destekler (5 TB'a kadar) ve çok parçalı yükleme desteği vardır. S3n: // URL'lerinden erişilebilen tüm nesnelere, yalnızca URL şemasını değiştirerek s3a'dan da erişilebilir olmalıdır. S3N'ye karşı hata raporlarının çoğu WONTFIX olarak kapatılır

Spark 2.0.1'in S3a ile çalışmasını sağlamak Spark 2.0.1 için, sınıf yolunuzda hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar kullanın; spark-default.conf dosyasını AWS tuşları ve S3A FileSystemClass ile güncellemeyi unutmayın

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Sorgu yeniden düzenleme ve aşağı itme yordamı olarak kesinlikle Dataframe'leri kullanın, kutudan çıkarılabilir ve bu nedenle sorgularınızı hızlandırmak için daha az veri alınır.

Aynı verileri birden çok kez okuyorsanız, gerçek bir dosya sisteminin daha iyi dosya okuma performansından yararlanmak için dosyaları yerel EMR kümenize aktarmak için .cache veya s3distcp komutunu kullanın. S3distcp'nin groupBy seçeneği, çok sayıda küçük dosyayı birleştirerek küçük dosya sorununu çözmek için mükemmel bir seçenektir.

Bu da beni çok sayıda küçük dosyanın okunması sorununa getiriyor. Bir araç kullanarak dosyaları birleştirmek bir seçenek değilse, yavaş S3 dizini listeleme darboğaz etrafında etkin bir şekilde çalışan aşağıdaki kodu deneyin

com.amazonaws.services.s3._, model ._ 'ı içe aktar
    com.amazonaws.auth.BasicAWSCredentials alma

    val request = new ListObjectsRequest ()
    request.setBucketName (kepçe)
    request.setPrefix (ön ek)
    request.setMaxKeys (pageLength)
    def s3 = yeni AmazonS3Client (yeni BasicAWSCredentials (anahtar, gizli))

    val objs = s3.listObjects (request) // Bu yöntemin, yukarıda "pageLength" den uzunsa kesilmiş veri verdiğini unutmayın. Bununla ilgilenmen gerekebilir.
    sc.parallelize (objs.getObjectSummaries.map (_. Getkey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (kepçe, anahtar) .getObjectContent: InputStream) .getLines}

Spark.sql.parquet.filterPushdown seçeneğinin doğru olduğundan ve spark.sql.parquet.mergeSchema seçeneğinin yanlış olduğundan emin olun (gerçekten yazmayı yavaşlatan yazılardaki şemaların birleşmesini önlemek için). Neyse ki Spark 2.0 doğru varsayılan değere sahip

Neden bir işin bitmek üzere olduğunu, günlüklere hiçbir şey yazılmadığını ve tüm kıvılcım işlemlerinin durmuş gibi göründüğünü merak ettiniz mi, ancak sonuçlar henüz S3'ün çıktı dizininde değil… neler oluyor? Her defasında, uygulayıcılar işin sonucunu her yazdığında, her biri dosyaların asıl dizinin dışındaki geçici bir dizine yazıyor ve tüm uygulayıcılar bir kez atomik ayrıcalık elde etmek için yeniden adlandırılıyor. Tüm bunlar, adların anlık olduğu, ancak S3 gibi bir nesne deposundaki hdfs gibi standart bir dosya sisteminde gayet iyi, S3'teki adlar 6 MB / s'de yapıldığı için elverişli değildir.

Mümkünse, işlerin çıktısını EMR hdfs'e yazın (neredeyse anlık adlardan yararlanın ve yerel hdflerin daha iyi IO dosyalarından yararlanın) ve dosyaları S3'e taşımak için bir dstcp adımı ekleyin; dosya sistemi olmaya çalışan bir nesne deposu. Ayrıca yerel hdfs'lere yazmak, DirectOutputCommiter ile ilişkili kilitlenme tuzaklarına düşmeden kaçak işleri kontrol etmenizi sağlar.

S3'ü çıktı dizini olarak kullanmanız gerekiyorsa, aşağıdaki Spark yapılandırmalarının ayarlandığından emin olun.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation yanlış

Not: DirectParquetOutputCommitter, veri kaybı olasılığı nedeniyle Spark 2.0'dan çıkarıldı. Maalesef S3a'dan tutarlılığı geliştirene kadar geçici çözümlerle çalışmak zorundayız. Hadoop 2.8 ile işler düzeliyor

Anahtar sözcükleri sözlük sırasına göre kullanmaktan kaçının. Biri, dolaşmak için karma / rasgele önekleri veya ters tarih-zaman kullanabilir. İşin püf noktası, anahtarlarınızı hiyerarşik olarak adlandırmak, filtrelendiğiniz en yaygın şeyleri anahtarınızın sol tarafına koymaktır. Ve DNS sorunları nedeniyle asla kova adlarında alt çizgi bulunmaz.

Fs.s3a.fast.upload etkinliğini tek bir dosyanın parçalarını Amazon S3'e paralel olarak yükleme

Bu, Spark'ın S3 ile çalışmasını sağlamak için son zamanlarda çözmekte olduğum üretim sorunlarının beyin dökümü idi. Bir sonraki yazıda daha derine indiğimde bunun için bizi izlemeye devam edin…