Commit new messages and fix message length mistake

This commit is contained in:
Leon Grünewald 2024-09-17 00:24:35 +02:00
parent 9865c10f40
commit ff9b2d3f96
3 changed files with 112 additions and 61 deletions

View file

@ -1,9 +1,9 @@
use anyhow::bail; use anyhow::bail;
use bytes::{BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use prost::Message; use prost::Message;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use crate::MumbleMessage; use crate::MumbleMessage;
use crate::proto::{Authenticate, Version, Ping}; use crate::proto::{Authenticate, Version, Ping, CryptSetup, CodecVersion, ChannelState, PermissionQuery, UserState, ServerSync, ServerConfig, UdpTunnel, UserRemove};
pub struct MumbleTcpCodec {} pub struct MumbleTcpCodec {}
@ -26,30 +26,55 @@ impl Decoder for MumbleTcpCodec {
let (message_length, rest) = rest.split_at(4); let (message_length, rest) = rest.split_at(4);
let message_type = u16::from_be_bytes(message_type.try_into()?) as usize; let message_type = u16::from_be_bytes(message_type.try_into()?) as usize;
let message_length = u32::from_be_bytes(message_length.try_into()?) as usize; let message_length = u32::from_be_bytes(message_length.try_into()?) as usize;
let mut mumble_message: Option<MumbleMessage> = None;
if message_length > src.len() { if message_length+6 > src.len() {
src.reserve(message_length+6); src.reserve(message_length+6-src.len());
return Ok(None); return Ok(None);
} }
let (message_data, _) = rest.split_at(message_length);
let mut mumble_message: Option<MumbleMessage> = None;
match message_type { match message_type {
0 => { 0 => {
//TODO: Version mumble_message = Some(MumbleMessage::Version { data: Version::decode(message_data)? });
let version = Version::decode(rest)?; }
mumble_message = Some(MumbleMessage::Version {data: version}); 1 => {
mumble_message = Some(MumbleMessage::UDPTunnel { data: UdpTunnel::decode(message_data)? });
} }
3 => { 3 => {
let ping = Ping::decode(rest)?; mumble_message = Some(MumbleMessage::Ping { data: Ping::decode(message_data)? });
mumble_message = Some(MumbleMessage::Ping { data: ping }); },
5 => {
mumble_message = Some(MumbleMessage::ServerSync { data: ServerSync::decode(message_data)? });
}
7 => {
mumble_message = Some(MumbleMessage::ChannelState { data: ChannelState::decode(message_data)? });
}
8 => {
mumble_message = Some(MumbleMessage::UserRemove { data: UserRemove::decode(message_data)? });
}
9 => {
mumble_message = Some(MumbleMessage::UserState { data: UserState::decode(message_data)? });
},
15 => {
mumble_message = Some(MumbleMessage::CryptSetup { data: CryptSetup::decode(message_data)? });
}
20 => {
mumble_message = Some(MumbleMessage::PermissionQuery {data: PermissionQuery::decode(message_data)?});
}
21 => {
mumble_message = Some(MumbleMessage::CodecVersion {data: CodecVersion::decode(message_data)?});
},
24 => {
mumble_message = Some(MumbleMessage::ServerConfig {data: ServerConfig::decode(message_data)?});
} }
_ => { _ => {
eprintln!("Unknown message type {:?}", message_type); eprintln!("Unknown message type {:?}", message_type);
} }
} }
let _ = src.split_to(message_length+6); src.advance(message_length+6);
src.reserve(6);
Ok(mumble_message) Ok(mumble_message)
} }
} }
@ -61,17 +86,14 @@ impl Encoder<MumbleMessage> for MumbleTcpCodec {
let mut message = BytesMut::new(); let mut message = BytesMut::new();
match item { match item {
MumbleMessage::Version { data } => { MumbleMessage::Version { data } => {
println!("Send Version");
Version::encode(&data, &mut message)?; Version::encode(&data, &mut message)?;
dst.put_u16(0); dst.put_u16(0);
} }
MumbleMessage::Authenticate { data } => { MumbleMessage::Authenticate { data } => {
println!("Send Authenticate");
Authenticate::encode(&data, &mut message)?; Authenticate::encode(&data, &mut message)?;
dst.put_u16(2); dst.put_u16(2);
}, },
MumbleMessage::Ping { data } => { MumbleMessage::Ping { data } => {
println!("Send Ping");
Ping::encode(&data, &mut message)?; Ping::encode(&data, &mut message)?;
dst.put_u16(3); dst.put_u16(3);
} }
@ -80,9 +102,8 @@ impl Encoder<MumbleMessage> for MumbleTcpCodec {
} }
} }
dst.put_u32((message.len() as u32) + 6); dst.put_u32(message.len() as u32);
dst.extend_from_slice(&message); dst.extend_from_slice(&message);
println!("{dst:?}");
Ok(()) Ok(())
} }
} }

View file

@ -19,17 +19,27 @@ use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, Error, RootCertS
use tokio_rustls::client::TlsStream; use tokio_rustls::client::TlsStream;
use tokio_rustls::{TlsConnector}; use tokio_rustls::{TlsConnector};
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime;
use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use crate::codec::MumbleTcpCodec; use crate::codec::MumbleTcpCodec;
use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; use tokio_rustls::rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use crate::proto::{Authenticate, Version, Ping}; use crate::proto::{Authenticate, Version, Ping, CryptSetup, CodecVersion, ChannelState, PermissionQuery, UserState, ServerSync, ServerConfig, UdpTunnel, UserRemove};
#[derive(Debug)] #[derive(Debug)]
pub enum MumbleMessage { pub enum MumbleMessage {
Version {data: Version}, Version {data: Version},
UDPTunnel { data: UdpTunnel },
Authenticate {data: Authenticate}, Authenticate {data: Authenticate},
Ping { data: Ping }, Ping { data: Ping },
ServerSync { data: ServerSync },
ChannelState { data: ChannelState },
UserRemove { data: UserRemove },
UserState { data: UserState },
CryptSetup { data: CryptSetup },
PermissionQuery { data: PermissionQuery },
CodecVersion { data: CodecVersion },
ServerConfig { data: ServerConfig },
UnknownMessage UnknownMessage
} }
@ -110,53 +120,27 @@ impl MumbleClient {
Ok(sock) Ok(sock)
} }
pub async fn connect(&mut self) -> anyhow::Result<()> { pub async fn connect(&mut self) -> anyhow::Result<Framed<TlsStream<TcpStream>, MumbleTcpCodec>> {
let mut framed = Framed::new(self.create_tcp_connection().await?, MumbleTcpCodec::new()); let mut framed = Framed::new(self.create_tcp_connection().await?, MumbleTcpCodec::new());
let version_v1 = (1u16 as u32) << 16 | (5u8 as u32) << 8 | (1u8 as u32);
let version_v2 = 1u64 << 48 | 5u64 << 32 | 0u64 << 16 | 1u64;
framed.send(MumbleMessage::Version { data: Version { framed.send(MumbleMessage::Version { data: Version {
os: Some(String::from("Linux")), os: Some(String::from("Linux")),
os_version: Some(String::from("os version")), os_version: Some(String::from("os version")),
release: Some(String::from("release")), release: Some(String::from("release")),
version_v1: None, version_v1: Some(version_v1),
version_v2: Some(0) version_v2: Some(version_v2)
}}).await?; }}).await?;
let udp_sock = self.create_udp_connection().await?;
let mut buf = [0; 1024];
while let Some(frame_result) = framed.next().await {
if let Ok(message) = frame_result {
println!("Receive: {message:?}");
match message {
MumbleMessage::Version { data } => {
framed.send(MumbleMessage::Authenticate { data: Authenticate { framed.send(MumbleMessage::Authenticate { data: Authenticate {
username:Some(String::from("Bot")), username:Some(String::from("Botty")),
password: None, password: Some(String::new()),
tokens: vec![], tokens: vec![],
celt_versions: vec![], celt_versions: vec![],
opus: Some(true), opus: Some(true),
client_type: Some(1), client_type: Some(1),
}}).await?; }}).await?;
/*framed.send(MumbleMessage::Ping {data: Ping {
good: None,
late: None,
lost: None,
resync: None,
tcp_packets: None,
tcp_ping_avg: None,
tcp_ping_var: None,
timestamp: None,
udp_packets: None,
udp_ping_avg: None,
udp_ping_var: None
}}).await?;*/
}
_ => {
println!("{message:?}");
}
}
}
}
Ok(()) Ok(framed)
} }
} }

View file

@ -1,8 +1,54 @@
use mumble_rs::MumbleClient; use std::net::ToSocketAddrs;
use std::time::{Duration, SystemTime};
use futures::{SinkExt, StreamExt};
use mumble_rs::{MumbleClient, MumbleMessage};
use mumble_rs::proto::Ping;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let mut client = MumbleClient::new("127.0.0.1:64738".parse()?); let mut client = MumbleClient::new("127.0.0.1:64738".to_socket_addrs()?.next().unwrap());
client.connect().await?; let (mut mumble_send, mut mumble_recv) = client.connect().await?.split();
let mut server_synced = false;
let (mut conn_send_channel_send, mut conn_send_channel_recv) = tokio::sync::mpsc::channel::<MumbleMessage>(10);
tokio::spawn(async move {
while !conn_send_channel_send.is_closed() {
conn_send_channel_send.send(MumbleMessage::Ping {data: Ping {
good: None,
late: None,
lost: None,
resync: None,
tcp_packets: None,
tcp_ping_avg: None,
tcp_ping_var: None,
timestamp: Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()),
udp_packets: None,
udp_ping_avg: None,
udp_ping_var: None
}}).await.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
tokio::spawn(async move {
while let Some(message) = conn_send_channel_recv.recv().await {
mumble_send.send(message).await.unwrap();
}
});
while let Some(frame_result) = mumble_recv.next().await {
if let Ok(message) = frame_result {
match message {
MumbleMessage::UserState {data} => {
if !server_synced { continue }
println!("{data:?}")
}
MumbleMessage::ServerSync {data} => {
server_synced = true;
}
_ => {}
}
}
}
Ok(()) Ok(())
} }