Tuesday, August 7, 2018

Apache Spark 2.x + Derby + AWS Integration


Download & Install Spark

download spark 2.2.0 pre-built for hadoop 2.7 untar spark distribution under /opt/spark

Configure Spark Master node

configure the following environment variables in /opt/spark-2.3.1-bin-hadoop2.7/conf/spark-env.sh please note in this example spark master port and webui_port are specified on custom ports, these are optional
SPARK_MASTER_PORT=7177
SPARK_MASTER_WEBUI_PORT=8180
SPARK_LOCAL_IP=10.4.12.38
SPARK_WORKER_MEMORY=168g
SPARK_WORKER_CORES=21
SPARK_LOCAL_DIRS=/opt/spark-2.3.1-bin-hadoop2.7/tmp
#SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"

Configure Spark Worker node

SPARK_MASTER_HOST should be defined for worker nodes
SPARK_MASTER_HOST=192.168.1.10
SPARK_LOCAL_IP=192.168.1.11
SPARK_WORKER_MEMORY=168g
SPARK_WORKER_CORES=21
SPARK_LOCAL_DIRS=/opt/spark-2.3.1-bin-hadoop2.7/tmp
#SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"
mkdir -p /var/lib/spark/rdd

useradd -u 1120 spark
chown -R spark: spark
chmod g+w -R spark
mkdir -p /opt/spark-2.3.1-bin-hadoop2.7/tmp
chmod 777 /opt/spark-2.3.1-bin-hadoop2.7/tmp
setfacl -Rdm g::rwx /opt/spark-2.3.1-bin-hadoop2.7/tmp ; setfacl -Rdm o::rwx /opt/spark-2.3.1-bin-hadoop2.7/tmp ; getfacl /opt/spark-2.3.1-bin-hadoop2.7/tmp

Copy the following jars from your Derby Network Server Install

scp root@192.168.1.10:/opt/derby/lib/derbytools.jar /opt/spark/jars
scp root@192.168.1.10:/opt/derby/lib/derbyclient.jar /opt/spark/jars

Download hadoop-aws-2.7.x.jar and aws-java-sdk-1.7.4.jar place them under /opt/spark/jars/

create the following spark-defaults.conf, this same config can be used in both master and slave nodes

cat >> /opt/spark/conf/spark-defaults.conf << EOF
spark.master                     spark://192.168.1.10:7177

# spark.eventLog.enabled         true
# spark.eventLog.dir             hdfs://namenode:8021/directory
# spark.serializer               org.apache.spark.serializer.KryoSerializer

spark.sql.warehouse.dir      /opt/spark-2.3.1-bin-hadoop2.7/tmp/warehouse
spark.driver.memory              2g
spark.executor.memory            2g
spark.executor.cores             2
spark.jars                       /opt/spark-2.3.1-bin-hadoop2.7/jars/hadoop-aws-2.7.3.jar,/opt/spark-2.3.1-bin-hadoop2.7/jars/aws-java-sdk-1.7.4.jar
spark.task.reaper.enabled        true
spark.task.reaper.killTimeout    300s
#spark.driver.extraJavaOptions    -Dderby.system.home=/opt/spark-2.3.1-bin-hadoop2.7/tmp/derby
EOF

Integrate Spark master and worker with supervisord

The following scripts are needed to be under $SPARK_HOME/sbin/. These script are modified from the original spark-daemon.sh, start-master.sh and start-slave.sh, it launches the process with exec command so it retains the parent PID for supervisor to manage it.
supervisor-spark-daemon.sh
supervisor-start-master.sh
supervisor-start-slave.sh
supervisor-start-thriftserver.sh
Create the supervisord config for master node
cat > /etc/supervisord.d/spark23-master.ini << EOF
[program:spark23-master]
environment=SPARK_NO_DAEMONIZE="true"
command=/opt/spark-2.3.1-bin-hadoop2.7/sbin/supervisor-start-master.sh           ; the program (relative uses PATH, can take args)
process_name=%(program_name)s  ; process_name expr (default %(program_name)s)
numprocs=1                     ; number of processes copies to start (def 1)
directory=/opt/spark-2.3.1-bin-hadoop2.7           ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
priority=3                     ; the relative start priority (default 999)
;autostart=true                ; start at supervisord start (default: true)
;autorestart=true              ; retstart at unexpected quit (default: true)
startsecs=10                    ; number of secs prog must stay running (def. 1)
;startretries=3                ; max # of serial start failures (default 3)
;exitcodes=0,2                 ; 'expected' exit codes for process (default 0,2)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
user=spark                     ; setuid to this UNIX account to run the program
redirect_stderr=true          ; redirect proc stderr to stdout (default false)
stdout_logfile=/opt/spark-2.3.1-bin-hadoop2.7/logs/%(program_name)s-stdout.log        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;environment=A=1,B=2           ; process environment additions (def no adds)
;serverurl=AUTO                ; override serverurl computation (childutils)
EOF
Create the supervisord config for worker node
cat > /etc/supervisord.d/spark23-slave.ini << EOF
[program:spark23-slave]
environment=SPARK_NO_DAEMONIZE="true"
command=/opt/spark-2.3.1-bin-hadoop2.7/sbin/supervisor-start-slave.sh spark://10.4.12.36:7177 ; the program (relative uses PATH, can take args)
process_name=%(program_name)s  ; process_name expr (default %(program_name)s)
numprocs=1                     ; number of processes copies to start (def 1)
directory=/opt/spark-2.3.1-bin-hadoop2.7 ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
priority=4                     ; the relative start priority (default 999)
;autostart=true                ; start at supervisord start (default: true)
;autorestart=true              ; retstart at unexpected quit (default: true)
startsecs=10                    ; number of secs prog must stay running (def. 1)
;startretries=3                ; max # of serial start failures (default 3)
;exitcodes=0,2                 ; 'expected' exit codes for process (default 0,2)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
user=spark                     ; setuid to this UNIX account to run the program
redirect_stderr=true          ; redirect proc stderr to stdout (default false)
stdout_logfile=/opt/spark-2.3.1-bin-hadoop2.7/logs/%(program_name)s-stdout.log        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;environment=A=1,B=2           ; process environment additions (def no adds)
;serverurl=AUTO                ; override serverurl computation (childutils)
EOF
Add the new config into supervisord and start it
sudo supervisorctl reread
sudo supervisorctl update

No comments: