Как можно передавать значения из одной функции в разные потоки?

  • 14 августа 2023 г. 9:36
Ответы на вопрос 4
Для передачи значений из одной функции в разные потоки вам потребуется использовать многопоточные примитивы. В языке Rust вы можете использовать потокобезопасные каналы (mpsc), которые уже встроены в стандартную библиотеку.

Чтобы реализовать автоматическое пайплайнинг, вам понадобятся два канала: один для передачи значений между функциями Get, и второй для выполнения запроса на сервер (Exec). Вы можете использовать один отправитель для каждого канала и передавать этот отправитель в функции Get.

Вот пример такой реализации:

```rust
use std::sync::mpsc;
use std::thread;

fn get(rx: mpsc::Receiver<u32>, tx: mpsc::Sender<u32>) {
    for msg in rx {
        // Обработка значения
        let result = msg + 1;

        // Отправка значения следующей функции
        tx.send(result).unwrap();
    }
}

fn main() {
    // Создание каналов для передачи значений
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();

    // Создание потоков для выполнения функций Get
    let thread1 = thread::spawn(move || {
        get(rx1, tx2);
    });

    // Отправка значений в первую функцию Get
    for value in 1..=256 {
        tx1.send(value).unwrap();
    }

    // Необходимое количество отправленных сообщений
    drop(tx1);

    // Получение результатов из второй функции Get
    for result in rx2 {
        // Обработка результата
        println!("Result: {}", result);
    }

    thread1.join().unwrap();
}
```

В этом примере мы создаем два канала для передачи значений между функциями Get. Затем мы создаем поток для выполнения первой функции Get и отправляем значения в нее через канал tx1. Завершая отправление значений, мы закрываем канал tx1, чтобы функция get поняла, что все значения были отправлены. Затем мы получаем результаты из второй функции Get через канал rx2.

Этот пример должен быть гораздо эффективнее, чем использование вектора отправителей, так как каждая функция Get будет иметь свою собственную очередь сообщений. Если ваш код выполняется медленно, возможно, есть другие места, в которых нужна оптимизация.
fn test() {
    let mut channels = Arc::new(Mutex::new(Vec::with_capacity(PAR)));
    let mut joins = Vec::with_capacity(PAR);
    
    for _ in 0..N / PAR {
        for _ in 0..PAR {
            let mut channels = Arc::clone(&channels);
            joins.push(thread::spawn(move || {
                get(channels.lock().unwrap());
            }));
        }
    }
    
    for j in joins {
        j.join().unwrap();
    }
}

#[inline(always)]
fn get(mut channels: MutexGuard>>) -> i32 {
    let (tx, rx) = mpsc::channel();
    channels.push(tx);
    
    if channels.len() == PAR {
        exec(channels);
    } else {
        drop(channels);
    }
    
    rx.recv().unwrap()
}

#[inline(always)]
fn exec(mut channels: MutexGuard>>) {
    let mut i = 0;
    
    for c in channels.iter() {
        i += 1;
        c.send(1).unwrap();
    }
    
    println!("{}", i);
    channels.clear();
}


fn test() {
    let (tx, rx) = mpsc::channel::>();
    let mut handles = Vec::with_capacity(N + 1);
    
    handles.push(thread::spawn(move || exec(rx)));
    
    for _ in 0..N {
        let tx = tx.clone();
        
        handles.push(thread::spawn(move || {
            get(tx);
        }))
    }
    
    drop(tx);
    
    for handle in handles {
        handle.join().unwrap();
    }
}

fn get(sender: mpsc::Sender>) -> i32 {
    let (tx, rx) = mpsc::channel();
    sender.send(tx).unwrap();
    
    rx.recv().unwrap()
}

fn exec(receiver: mpsc::Receiver>) {
    let mut channels = Vec::with_capacity(PAR);
    
    loop {
        for _ in 0..PAR {
            if let Ok(tx) = receiver.recv() {
                channels.push(tx);
            } else {
                return;
            }
        }
        
        let mut i = 0;
        
        for c in channels.iter() {
            i += 1;
            c.send(1).unwrap();
        }
        
        println!("{}", i);
        channels.clear();
    }
}


async fn test() {
    let (tx, rx) = mpsc::unbounded_channel::>();
    let mut handles = Vec::with_capacity(N + 1);
    
    handles.push(tokio::spawn(async move { exec(rx).await }));
    
    for _ in 0..N {
        let tx = tx.clone();
        
        handles.push(tokio::spawn(async move {
            get(tx).await;
        }))
    }
    
    drop(tx);
    
    for handle in handles {
        handle.await.unwrap();
    }
}

async fn get(sender: mpsc::UnboundedSender>) -> i32 {
    let (tx, mut rx) = mpsc::unbounded_channel();
    sender.send(tx).unwrap();
    
    rx.recv().await.unwrap()
}

async fn exec(mut receiver: mpsc::UnboundedReceiver>) {
    let mut channels = Vec::with_capacity(PAR);
    
    loop {
        for _ in 0..PAR {
            if let Some(tx) = receiver.recv().await {
                channels.push(tx);
            } else {
                return;
            }
        }
        
        let mut i = 0;
        
        for c in channels.iter() {
            i += 1;
            c.send(1).unwrap();
        }
        
        println!("{}", i);
        channels.clear();
    }
}
Похожие вопросы