"Fossies" - the Fresh Open Source Software Archive

Member "kafka-2.2.0-src/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala" (9 Mar 2019, 8524 Bytes) of package /linux/misc/kafka-2.2.0-src.tgz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Scala source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "AbstractCoordinatorConcurrencyTest.scala": 2.1.1_vs_2.2.0.

    1 /**
    2  * Licensed to the Apache Software Foundation (ASF) under one or more
    3  * contributor license agreements.  See the NOTICE file distributed with
    4  * this work for additional information regarding copyright ownership.
    5  * The ASF licenses this file to You under the Apache License, Version 2.0
    6  * (the "License"); you may not use this file except in compliance with
    7  * the License.  You may obtain a copy of the License at
    8  *
    9  * http://www.apache.org/licenses/LICENSE-2.0
   10  *
   11  * Unless required by applicable law or agreed to in writing, software
   12  * distributed under the License is distributed on an "AS IS" BASIS,
   13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14  * See the License for the specific language governing permissions and
   15  * limitations under the License.
   16  */
   17 
   18 package kafka.coordinator
   19 
   20 import java.util.{Collections, Random}
   21 import java.util.concurrent.{ConcurrentHashMap, Executors}
   22 import java.util.concurrent.atomic.AtomicInteger
   23 import java.util.concurrent.locks.Lock
   24 
   25 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
   26 import kafka.log.Log
   27 import kafka.server._
   28 import kafka.utils._
   29 import kafka.utils.timer.MockTimer
   30 import kafka.zk.KafkaZkClient
   31 import org.apache.kafka.common.TopicPartition
   32 import org.apache.kafka.common.protocol.Errors
   33 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats}
   34 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
   35 import org.easymock.EasyMock
   36 import org.junit.{After, Before}
   37 
   38 import scala.collection._
   39 import scala.collection.JavaConverters._
   40 
   41 abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] {
   42 
   43   val nThreads = 5
   44 
   45   val time = new MockTime
   46   val timer = new MockTimer
   47   val executor = Executors.newFixedThreadPool(nThreads)
   48   val scheduler = new MockScheduler(time)
   49   var replicaManager: TestReplicaManager = _
   50   var zkClient: KafkaZkClient = _
   51   val serverProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
   52   val random = new Random
   53 
   54   @Before
   55   def setUp() {
   56 
   57     replicaManager = EasyMock.partialMockBuilder(classOf[TestReplicaManager]).createMock()
   58     replicaManager.createDelayedProducePurgatory(timer)
   59 
   60     zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   61   }
   62 
   63   @After
   64   def tearDown() {
   65     EasyMock.reset(replicaManager)
   66     if (executor != null)
   67       executor.shutdownNow()
   68   }
   69 
   70   /**
   71     * Verify that concurrent operations run in the normal sequence produce the expected results.
   72     */
   73   def verifyConcurrentOperations(createMembers: String => Set[M], operations: Seq[Operation]) {
   74     OrderedOperationSequence(createMembers("verifyConcurrentOperations"), operations).run()
   75   }
   76 
   77   /**
   78     * Verify that arbitrary operations run in some random sequence don't leave the coordinator
   79     * in a bad state. Operations in the normal sequence should continue to work as expected.
   80     */
   81   def verifyConcurrentRandomSequences(createMembers: String => Set[M], operations: Seq[Operation]) {
   82     EasyMock.reset(replicaManager)
   83     for (i <- 0 to 10) {
   84       // Run some random operations
   85       RandomOperationSequence(createMembers(s"random$i"), operations).run()
   86 
   87       // Check that proper sequences still work correctly
   88       OrderedOperationSequence(createMembers(s"ordered$i"), operations).run()
   89     }
   90   }
   91 
   92   def verifyConcurrentActions(actions: Set[Action]) {
   93     val futures = actions.map(executor.submit)
   94     futures.map(_.get)
   95     enableCompletion()
   96     actions.foreach(_.await())
   97   }
   98 
   99   def enableCompletion(): Unit = {
  100     replicaManager.tryCompleteDelayedRequests()
  101     scheduler.tick()
  102   }
  103 
  104   abstract class OperationSequence(members: Set[M], operations: Seq[Operation]) {
  105     def actionSequence: Seq[Set[Action]]
  106     def run(): Unit = {
  107       actionSequence.foreach(verifyConcurrentActions)
  108     }
  109   }
  110 
  111   case class OrderedOperationSequence(members: Set[M], operations: Seq[Operation])
  112     extends OperationSequence(members, operations) {
  113     override def actionSequence: Seq[Set[Action]] = {
  114       operations.map { op =>
  115         members.map(op.actionWithVerify)
  116       }
  117     }
  118   }
  119 
  120   case class RandomOperationSequence(members: Set[M], operations: Seq[Operation])
  121     extends OperationSequence(members, operations) {
  122     val opCount = operations.length
  123     def actionSequence: Seq[Set[Action]] = {
  124       (0 to opCount).map { _ =>
  125         members.map { member =>
  126           val op = operations(random.nextInt(opCount))
  127           op.actionNoVerify(member) // Don't wait or verify since these operations may block
  128         }
  129       }
  130     }
  131   }
  132 
  133   abstract class Operation {
  134     def run(member: M): Unit
  135     def awaitAndVerify(member: M): Unit
  136     def actionWithVerify(member: M): Action = {
  137       new Action() {
  138         def run(): Unit = Operation.this.run(member)
  139         def await(): Unit = awaitAndVerify(member)
  140       }
  141     }
  142     def actionNoVerify(member: M): Action = {
  143       new Action() {
  144         def run(): Unit = Operation.this.run(member)
  145         def await(): Unit = timer.advanceClock(100) // Don't wait since operation may block
  146       }
  147     }
  148   }
  149 }
  150 
  151 object AbstractCoordinatorConcurrencyTest {
  152 
  153   trait Action extends Runnable {
  154     def await(): Unit
  155   }
  156 
  157   trait CoordinatorMember {
  158   }
  159 
  160   class TestReplicaManager extends ReplicaManager(
  161     null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) {
  162 
  163     var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
  164     var watchKeys: mutable.Set[TopicPartitionOperationKey] = _
  165     def createDelayedProducePurgatory(timer: MockTimer): Unit = {
  166       producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false)
  167       watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
  168     }
  169     def tryCompleteDelayedRequests(): Unit = {
  170       watchKeys.map(producePurgatory.checkAndComplete)
  171     }
  172 
  173     override def appendRecords(timeout: Long,
  174                                requiredAcks: Short,
  175                                internalTopicsAllowed: Boolean,
  176                                isFromClient: Boolean,
  177                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
  178                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
  179                                delayedProduceLock: Option[Lock] = None,
  180                                processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
  181 
  182       if (entriesPerPartition.isEmpty)
  183         return
  184       val produceMetadata = ProduceMetadata(1, entriesPerPartition.map {
  185         case (tp, _) =>
  186           (tp, ProducePartitionStatus(0L, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
  187       })
  188       val delayedProduce = new DelayedProduce(5, produceMetadata, this, responseCallback, delayedProduceLock) {
  189         // Complete produce requests after a few attempts to trigger delayed produce from different threads
  190         val completeAttempts = new AtomicInteger
  191         override def tryComplete(): Boolean = {
  192           if (completeAttempts.incrementAndGet() >= 3)
  193             forceComplete()
  194           else
  195             false
  196         }
  197         override def onComplete() {
  198           responseCallback(entriesPerPartition.map {
  199             case (tp, _) =>
  200               (tp, new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))
  201           })
  202         }
  203       }
  204       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
  205       watchKeys ++= producerRequestKeys
  206       producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  207       tryCompleteDelayedRequests()
  208     }
  209     override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
  210       Some(RecordBatch.MAGIC_VALUE_V2)
  211     }
  212     @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
  213     def getOrCreateLogs(): mutable.Map[TopicPartition, (Log, Long)] = {
  214       if (logs == null)
  215         logs = mutable.Map[TopicPartition, (Log, Long)]()
  216       logs
  217     }
  218     def updateLog(topicPartition: TopicPartition, log: Log, endOffset: Long): Unit = {
  219       getOrCreateLogs().put(topicPartition, (log, endOffset))
  220     }
  221     override def getLog(topicPartition: TopicPartition): Option[Log] =
  222       getOrCreateLogs().get(topicPartition).map(l => l._1)
  223     override def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
  224       getOrCreateLogs().get(topicPartition).map(l => l._2)
  225   }
  226 }