Use more fields from StreamSettings

This commit is contained in:
PabloMK7 2024-03-05 19:28:21 +01:00
parent fcf3430788
commit 652dbecf48
6 changed files with 25 additions and 34 deletions

View file

@ -169,13 +169,13 @@ func (pc *PRUDPConnection) resetHeartbeat() {
}
if pc.heartbeatTimer != nil {
pc.heartbeatTimer.Reset(pc.endpoint.Server.pingTimeout) // TODO - This is part of StreamSettings
pc.heartbeatTimer.Reset(time.Duration(pc.StreamSettings.MaxSilenceTime) * time.Millisecond)
}
}
func (pc *PRUDPConnection) startHeartbeat() {
endpoint := pc.endpoint
server := endpoint.Server
maxSilenceTime := time.Duration(pc.StreamSettings.MaxSilenceTime) * time.Millisecond
// * Every time a packet is sent, connection.resetHeartbeat()
// * is called which resets this timer. If this function
@ -183,12 +183,12 @@ func (pc *PRUDPConnection) startHeartbeat() {
// * in the expected time frame. If this happens, send
// * the client a PING packet to try and kick start the
// * heartbeat again
pc.heartbeatTimer = time.AfterFunc(server.pingTimeout, func() {
pc.heartbeatTimer = time.AfterFunc(maxSilenceTime, func() {
endpoint.sendPing(pc)
// * If the heartbeat still did not restart, assume the
// * connection is dead and clean up
pc.pingKickTimer = time.AfterFunc(server.pingTimeout, func() {
pc.pingKickTimer = time.AfterFunc(maxSilenceTime, func() {
pc.cleanup() // * "removed" event is dispatched here
discriminator := fmt.Sprintf("%s-%d-%d", pc.Socket.Address.String(), pc.StreamType, pc.StreamID)

View file

@ -6,7 +6,6 @@ import (
"fmt"
"net"
"runtime"
"time"
"github.com/lxzan/gws"
)
@ -22,7 +21,6 @@ type PRUDPServer struct {
KerberosTicketVersion int
SessionKeyLength int
FragmentSize int
pingTimeout time.Duration
PRUDPv1ConnectionSignatureKey []byte
LibraryVersions *LibraryVersions
ByteStreamSettings *ByteStreamSettings
@ -330,7 +328,6 @@ func NewPRUDPServer() *PRUDPServer {
Connections: NewMutexMap[string, *SocketConnection](),
SessionKeyLength: 32,
FragmentSize: 1300,
pingTimeout: time.Second * 15,
LibraryVersions: NewLibraryVersions(),
ByteStreamSettings: NewByteStreamSettings(),
PRUDPV0Settings: NewPRUDPV0Settings(),

View file

@ -2,7 +2,6 @@ package nex
import (
"crypto/rc4"
"time"
)
// ReliablePacketSubstreamManager represents a substream manager for reliable PRUDP packets
@ -85,7 +84,7 @@ func NewReliablePacketSubstreamManager(startingIncomingSequenceID, startingOutgo
packetMap: NewMutexMap[uint16, PRUDPPacketInterface](),
incomingSequenceIDCounter: NewCounter[uint16](startingIncomingSequenceID),
outgoingSequenceIDCounter: NewCounter[uint16](startingOutgoingSequenceID),
ResendScheduler: NewResendScheduler(5, time.Second, 0),
ResendScheduler: NewResendScheduler(),
}
psm.SetCipherKey([]byte("CD&ML"))

View file

@ -35,8 +35,6 @@ func (pi *PendingPacket) startResendTimer() {
// ResendScheduler manages the resending of reliable PRUDP packets
type ResendScheduler struct {
packets *MutexMap[uint16, *PendingPacket]
Interval time.Duration
Increase time.Duration
}
// Stop kills the resend scheduler and stops all pending packets
@ -62,10 +60,13 @@ func (rs *ResendScheduler) Stop() {
// AddPacket adds a packet to the scheduler and begins it's timer
func (rs *ResendScheduler) AddPacket(packet PRUDPPacketInterface) {
connection := packet.Sender().(*PRUDPConnection)
slidingWindow := connection.SlidingWindow(packet.SubstreamID())
pendingPacket := &PendingPacket{
packet: packet,
rs: rs,
interval: rs.Interval,
interval: time.Duration(slidingWindow.streamSettings.KeepAliveTimeout) * time.Millisecond,
}
rs.packets.Set(packet.SequenceID(), pendingPacket)
@ -106,28 +107,26 @@ func (rs *ResendScheduler) resendPacket(pendingPacket *PendingPacket) {
return
}
if time.Since(pendingPacket.lastSendTime) >= rs.Interval {
if time.Since(pendingPacket.lastSendTime) >= time.Duration(slidingWindow.streamSettings.KeepAliveTimeout) * time.Millisecond {
// * Resend the packet to the connection
server := connection.endpoint.Server
data := packet.Bytes()
server.sendRaw(connection.Socket, data)
pendingPacket.interval += rs.Increase
pendingPacket.ticker.Reset(pendingPacket.interval)
pendingPacket.resendCount++
if (pendingPacket.resendCount < slidingWindow.streamSettings.ExtraRestransmitTimeoutTrigger) {
pendingPacket.interval += time.Duration(uint32(float32(slidingWindow.streamSettings.KeepAliveTimeout) * slidingWindow.streamSettings.RetransmitTimeoutMultiplier)) * time.Millisecond
} else {
pendingPacket.interval += time.Duration(uint32(float32(slidingWindow.streamSettings.KeepAliveTimeout) * slidingWindow.streamSettings.ExtraRetransmitTimeoutMultiplier)) * time.Millisecond
}
pendingPacket.ticker.Reset(pendingPacket.interval)
pendingPacket.lastSendTime = time.Now()
}
}
// NewResendScheduler creates a new ResendScheduler with the provided max resend count and interval and increase durations
//
// If increase is non-zero then every resend will have it's duration increased by that amount. For example an interval of
// 1 second and an increase of 5 seconds. The 1st resend happens after 1 second, the 2nd will take place 6 seconds
// after the 1st, and the 3rd will take place 11 seconds after the 2nd
func NewResendScheduler(maxResendCount int, interval, increase time.Duration) *ResendScheduler {
// NewResendScheduler creates a new ResendScheduler
func NewResendScheduler() *ResendScheduler {
return &ResendScheduler{
packets: NewMutexMap[uint16, *PendingPacket](),
Interval: interval,
Increase: increase,
}
}

View file

@ -1,9 +1,5 @@
package nex
import (
"time"
)
// SlidingWindow is an implementation of rdv::SlidingWindow.
// SlidingWindow reorders pending reliable packets to ensure they are handled in the expected order.
// In the original library each virtual connection stream only uses a single SlidingWindow, but starting
@ -74,7 +70,7 @@ func NewSlidingWindow() *SlidingWindow {
pendingPackets: NewMutexMap[uint16, PRUDPPacketInterface](),
incomingSequenceIDCounter: NewCounter[uint16](0),
outgoingSequenceIDCounter: NewCounter[uint16](0),
ResendScheduler: NewResendScheduler(5, time.Second, 0),
ResendScheduler: NewResendScheduler(),
}
return sw

View file

@ -12,19 +12,19 @@ import (
// The original library has more settings which are not present here as their use is unknown.
// Not all values are used at this time, and only exist to future-proof for a later time.
type StreamSettings struct {
ExtraRestransmitTimeoutTrigger uint32 // * Unused. The number of times a packet can be retransmitted before ExtraRetransmitTimeoutMultiplier is used
ExtraRestransmitTimeoutTrigger uint32 // * The number of times a packet can be retransmitted before ExtraRetransmitTimeoutMultiplier is used
MaxPacketRetransmissions uint32 // * The number of times a packet can be retransmitted before the timeout time is checked
KeepAliveTimeout uint32 // * Unused. Presumably the time a packet can be alive for without acknowledgement? Milliseconds?
KeepAliveTimeout uint32 // * Presumably the time a packet can be alive for without acknowledgement? Milliseconds?
ChecksumBase uint32 // * Unused. The base value for PRUDPv0 checksum calculations
FaultDetectionEnabled bool // * Unused. Presumably used to detect PIA faults?
InitialRTT uint32 // * Unused. The connections initial RTT
EncryptionAlgorithm encryption.Algorithm // * The encryption algorithm used for packet payloads
ExtraRetransmitTimeoutMultiplier float32 // * Unused. Used as part of the RTO calculations when retransmitting a packet. Only used if ExtraRestransmitTimeoutTrigger has been reached
ExtraRetransmitTimeoutMultiplier float32 // * Used as part of the RTO calculations when retransmitting a packet. Only used if ExtraRestransmitTimeoutTrigger has been reached
WindowSize uint32 // * Unused. The max number of (reliable?) packets allowed in a SlidingWindow
CompressionAlgorithm compression.Algorithm // * The compression algorithm used for packet payloads
RTTRetransmit uint32 // * Unused. Unknown use
RetransmitTimeoutMultiplier float32 // * Unused. Used as part of the RTO calculations when retransmitting a packet. Only used if ExtraRestransmitTimeoutTrigger has not been reached
MaxSilenceTime uint32 // * Unused. Presumably the time a connection can go without any packets from the other side? Milliseconds?
RetransmitTimeoutMultiplier float32 // * Used as part of the RTO calculations when retransmitting a packet. Only used if ExtraRestransmitTimeoutTrigger has not been reached
MaxSilenceTime uint32 // * Presumably the time a connection can go without any packets from the other side? Milliseconds?
}
// Copy returns a new copy of the settings