diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala index 4fad15ca3..5f725e05e 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala @@ -31,6 +31,9 @@ private[ftp] trait FtpsOperations extends CommonFtpOperations { Try { connectionSettings.proxy.foreach(ftpClient.setProxy) + connectionSettings.keyManager.foreach(ftpClient.setKeyManager) + connectionSettings.trustManager.foreach(ftpClient.setTrustManager) + if (ftpClient.getAutodetectUTF8() != connectionSettings.autodetectUTF8) { ftpClient.setAutodetectUTF8(connectionSettings.autodetectUTF8) } diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala index e489d09dd..0a05dd994 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala @@ -15,6 +15,8 @@ package org.apache.pekko.stream.connectors.ftp import java.net.InetAddress import java.net.Proxy +import javax.net.ssl.KeyManager +import javax.net.ssl.TrustManager import java.nio.file.attribute.PosixFilePermission import org.apache.pekko.annotation.{ DoNotInherit, InternalApi } @@ -185,7 +187,9 @@ final class FtpsSettings private ( val passiveMode: Boolean, val autodetectUTF8: Boolean, val configureConnection: FTPSClient => Unit, - val proxy: Option[Proxy]) extends FtpFileSettings { + val proxy: Option[Proxy], + val keyManager: Option[KeyManager], + val trustManager: Option[TrustManager]) extends FtpFileSettings { def withHost(value: java.net.InetAddress): FtpsSettings = copy(host = value) def withPort(value: Int): FtpsSettings = copy(port = value) @@ -196,6 +200,8 @@ final class FtpsSettings private ( def withAutodetectUTF8(value: Boolean): FtpsSettings = if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value) def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value)) + def withKeyManager(value: KeyManager): FtpsSettings = copy(keyManager = Some(value)) + def withTrustManager(value: TrustManager): FtpsSettings = copy(trustManager = Some(value)) /** * Scala API: @@ -220,7 +226,9 @@ final class FtpsSettings private ( passiveMode: Boolean = passiveMode, autodetectUTF8: Boolean = autodetectUTF8, configureConnection: FTPSClient => Unit = configureConnection, - proxy: Option[Proxy] = proxy): FtpsSettings = new FtpsSettings( + proxy: Option[Proxy] = proxy, + keyManager: Option[KeyManager] = keyManager, + trustManager: Option[TrustManager] = trustManager): FtpsSettings = new FtpsSettings( host = host, port = port, credentials = credentials, @@ -228,7 +236,9 @@ final class FtpsSettings private ( passiveMode = passiveMode, autodetectUTF8 = autodetectUTF8, configureConnection = configureConnection, - proxy = proxy) + proxy = proxy, + keyManager = keyManager, + trustManager = trustManager) override def toString = "FtpsSettings(" + @@ -239,7 +249,9 @@ final class FtpsSettings private ( s"passiveMode=$passiveMode," + s"autodetectUTF8=$autodetectUTF8" + s"configureConnection=$configureConnection," + - s"proxy=$proxy)" + s"proxy=$proxy" + + s"keyManager=$keyManager" + + s"trustManager=$trustManager)" } /** @@ -259,7 +271,9 @@ object FtpsSettings { passiveMode = false, autodetectUTF8 = false, configureConnection = _ => (), - proxy = None) + proxy = None, + keyManager = None, + trustManager = None) /** Java API */ def create(host: java.net.InetAddress): FtpsSettings = apply( diff --git a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala new file mode 100644 index 000000000..6fb9557b5 --- /dev/null +++ b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/FtpsWithTrustAndKeyManagersStageSpec.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.connectors.ftp + +import org.apache.pekko +import pekko.stream.IOResult +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.util.ByteString +import pekko.{ Done, NotUsed } +import org.mockito.ArgumentMatchers.{ any, anyString } +import org.mockito.Mockito.{ atLeastOnce, doNothing, verify } +import org.scalatestplus.mockito.MockitoSugar + +import java.net.{ InetAddress, Socket } +import java.security.cert.X509Certificate +import javax.net.ssl.{ X509ExtendedKeyManager, X509ExtendedTrustManager } +import scala.concurrent.Future + +class FtpsWithTrustAndKeyManagersStageSpec extends BaseFtpsSpec with CommonFtpStageSpec with MockitoSugar { + + // The implementation of X509ExtendedTrustManager and X509ExtendedKeyManager is final so + // its not possible to put a Mockito spy on it, instead lets just mock the classes and the + // checkServerTrusted method which is executed only when trustManager/keyManager is setup in FtpsSettings + + val keyManager: X509ExtendedKeyManager = mock[X509ExtendedKeyManager] + val trustManager: X509ExtendedTrustManager = mock[X509ExtendedTrustManager] + + doNothing().when(trustManager).checkServerTrusted(any(classOf[Array[X509Certificate]]), anyString, + any(classOf[Socket])) + + override val settings = + FtpsSettings( + InetAddress.getByName(HOSTNAME)).withPort(PORT) + .withCredentials(CREDENTIALS) + .withBinary(true) + .withPassiveMode(true) + .withTrustManager(trustManager) + .withKeyManager(keyManager) + + private def verifyServerCheckCertificate(): Unit = + verify(trustManager, atLeastOnce()).checkServerTrusted(any(classOf[Array[X509Certificate]]), anyString, + any(classOf[Socket])) + + private def verifyAfterStream[O, Mat](source: Source[O, Mat]): Source[O, Mat] = + source.map { result => + verifyServerCheckCertificate() + result + } + + private def verifyAfterStream[I, Mat](sink: Sink[I, Mat]): Sink[I, Mat] = + sink.mapMaterializedValue { result => + verifyServerCheckCertificate() + result + } + + override protected def listFiles(basePath: String): Source[FtpFile, NotUsed] = + verifyAfterStream(super.listFiles(basePath)) + + override protected def listFilesWithFilter(basePath: String, branchSelector: FtpFile => Boolean, + emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] = + verifyAfterStream(super.listFilesWithFilter(basePath, branchSelector, emitTraversedDirectories)) + + override protected def retrieveFromPath(path: String, fromRoot: Boolean): Source[ByteString, Future[IOResult]] = + verifyAfterStream(super.retrieveFromPath(path, fromRoot)) + + override protected def retrieveFromPathWithOffset(path: String, offset: Long): Source[ByteString, Future[IOResult]] = + verifyAfterStream(super.retrieveFromPathWithOffset(path, offset)) + + override protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] = + verifyAfterStream(super.storeToPath(path, append)) + + override protected def remove(): Sink[FtpFile, Future[IOResult]] = + verifyAfterStream(super.remove()) + + override protected def move(destinationPath: FtpFile => String): Sink[FtpFile, Future[IOResult]] = + verifyAfterStream(super.move(destinationPath)) + + override protected def mkdir(basePath: String, name: String): Source[Done, NotUsed] = + verifyAfterStream(super.mkdir(basePath, name)) + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 140406c08..bb6e2d208 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -171,7 +171,7 @@ object Dependencies { val Ftp = Seq( libraryDependencies ++= Seq( "commons-net" % "commons-net" % "3.8.0", - "com.hierynomus" % "sshj" % "0.33.0")) + "com.hierynomus" % "sshj" % "0.33.0") ++ Mockito) val GeodeVersion = "1.15.0" val GeodeVersionForDocs = "115"