So I managed to find another type inference issue specific to Scala 3 in the context of pekko-streams. This one is a little more involved, but this version of the code which compiles fine for Scala 2
SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
.via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks
.mergeSubstreamsWithParallelism(parallelism)
.filter(_.size > 0)
.via(atLeastOne)
.zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
had to be changed to
val source1: SubFlow[Chunk, NotUsed, Flow[ByteString, ByteString, NotUsed]#Repr, Sink[ByteString, NotUsed]] =
SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
.via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks
val source2 = source1.mergeSubstreamsWithParallelism(parallelism)
.filter(_.size > 0)
.via(atLeastOne)
source2
.zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
In order for it to compile with Scala 3.3.0 as you can see in the PR at https://github.com/apache/incubator-pekko-connectors/pull/167/files#r1249747354
The error that Scala 3.3.0 compiler provides is
[error] -- Error: /Users/mdedetrich/github/incubator-pekko-connectors/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala:1184:11
[error] 1180 | SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
[error] 1181 | .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks
[error] 1182 | .mergeSubstreamsWithParallelism(parallelism)
[error] 1183 | .filter(_.size > 0)
[error] 1184 | .via(atLeastOne)
[error] | ^
[error] |bad parameter reference ([O] =>>
[error] | org.apache.pekko.stream.scaladsl.Flow[
[error] | org.apache.pekko.util.ByteString @uncheckedVariance, O,
[error] | org.apache.pekko.NotUsed @uncheckedVariance]
[error] |) @uncheckedVariance[org.apache.pekko.stream.connectors.s3.impl.Chunk]#Mat at sbt-api
[error] |the parameter is type Mat in class Flow but the prefix ([O] =>>
[error] | org.apache.pekko.stream.scaladsl.Flow[
[error] | org.apache.pekko.util.ByteString @uncheckedVariance, O,
[error] | org.apache.pekko.NotUsed @uncheckedVariance]
[error] |) @uncheckedVariance[org.apache.pekko.stream.connectors.s3.impl.Chunk]
[error] |does not define any corresponding arguments.
[error] |idx = 2, args = org.apache.pekko.stream.connectors.s3.impl.Chunk,
[error] |constraint = uninstantiated variables:
[error] | constrained types:
[error] | bounds:
[error] | ordering:
[error] | co-deps:
[error] | contra-deps:
My initial hunch is that this may be due to the @uncheckedVariance
at https://github.com/apache/incubator-pekko/blob/580f12c29fb61b65758af9b0c31494af0af17175/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala#L69 not being properly propagated through