/** * Copyright (c) 2006 - 2009 Smaxe Ltd (www.smaxe.com). * All rights reserved. */ import com.smaxe.io.ByteArray; import com.smaxe.uv.client.ICamera; import com.smaxe.uv.client.IMicrophone; import com.smaxe.uv.client.INetConnection; import com.smaxe.uv.client.INetStream; import com.smaxe.uv.client.License; import com.smaxe.uv.client.NetConnection; import com.smaxe.uv.client.NetStream; import com.smaxe.uv.client.microphone.AbstractMicrophone; import com.smaxe.uv.stream.support.MediaDataByteArray; import lt.dkd.nellymoser.CodecImpl; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; import javax.sound.sampled.DataLine; import javax.sound.sampled.TargetDataLine; /** * <code>ExVoicePublisher</code> - publishes captured voice to the RTMP server. * <p> Note: * <br> - It encodes audio using Nellymoser ASAO codec implementation mentioned * at http://ffmpeg.org/ (June 16, 2008 news). * <br> - Nellymoser ASAO codec is available at <a href="http://www.smaxe.com/source.jsf?id=lt/dkd/nellymoser/CodecImpl.java" target="_blank">Nellymoser ASAO (Java class)</a> * <br> - Desktop publisher example is available at <a href="http://www.smaxe.com/source.jsf?id=ExDesktopPublisher.java" target="_blank">Desktop publisher (Java class)</a> * * @author Andrei Sochirca */ public final class ExVoicePublisher extends Object { /** * Entry point. * * @param args * @throws Exception if an exception occurred */ public static void main(final String[] args) throws Exception { // NOTE: // you can get Evaluation Key at: // http://www.smaxe.com/order.jsf#request_evaluation_key // or buy at: // http://www.smaxe.com/order.jsf License.setKey("SET-YOUR-KEY"); final String url = "rtmp://localhost:1935/live"; final Microphone microphone = new Microphone(); microphone.start(); new Thread(new Runnable() { public void run() { Publisher publisher = new Publisher(); publisher.publish(url, "voice", microphone, null /*camera*/); } }).start(); } /** * <code>Microphone</code> - {@link IMicrophone} implementation that captures audio device stream. */ public final static class Microphone extends AbstractMicrophone { /** * <code>CaptureRunnable</code> - {@link Runnable} implementation * that captures audio. */ private final class CaptureRunnable extends Object implements Runnable { // fields private volatile boolean stopped = false; private ExecutorService executor = null; /** * Constructor. */ public CaptureRunnable() { executor = Executors.newSingleThreadExecutor(); } /** * Starts the capture. */ public void start() { stopped = false; } /** * Stops the capture. */ public void stop() { stopped = true; } /** * Releases the capture resources. */ public void release() { executor.shutdown(); } // Runnable implementation public void run() { final AudioFormat audioFormat = new AudioFormat(8000f /*sample rate*/, 16 /*sample size in bits*/, 1 /*channels*/, true /*signed*/, true /*big endian*/); final DataLine.Info dataLineInfo = new DataLine.Info(TargetDataLine.class, audioFormat); try { final TargetDataLine targetDataLine = (TargetDataLine) AudioSystem.getLine(dataLineInfo); // opens line if necessary if (!targetDataLine.isOpen()) { targetDataLine.open(); } // starts data line targetDataLine.start(); Thread capture = new Thread(new Runnable() { public void run() { while (!stopped) { byte[] buf = new byte[512]; targetDataLine.read(buf, 0, buf.length); encode(buf); } } }); capture.start(); while (!stopped) { try { Thread.sleep(1 * 1000); } catch (Exception e) {/*ignore*/} } } catch (Exception e) { e.printStackTrace(); } } // inner use methods private final float[] state = new float[64]; /** * Encodes audio data. * * @param audio */ private void encode(final byte[] audio) { executor.execute(new Runnable() { public void run() { byte[] encoded = new byte[64]; CodecImpl.encode(state, toFloats(audio), encoded); byte[] data = new byte[1 + encoded.length]; data[0] = 82;// Audio.TAG_NELLYMOSER; System.arraycopy(encoded, 0, data, 1, encoded.length); fireOnAudioData(new MediaDataByteArray(32, new ByteArray(data))); } }); } /** * @param bytes * @return floats */ private float[] toFloats(final byte[] bytes) { float[] floats = new float[bytes.length >> 1]; for (int i = 0, n = floats.length; i < n; i++) { floats[i] = (bytes[i * 2 + 0] << 8) + (bytes[i * 2 + 1] << 0); } return floats; } } // fields private CaptureRunnable capture = null; private Thread t = null; /** * Constructor. */ public Microphone() { capture = new CaptureRunnable(); } /** * Starts audio capture. */ public void start() { if (t == null) { t = new Thread(capture); t.start(); } capture.start(); } /** * Stops video capture. */ public void stop() { capture.stop(); } /** * Releases the resources. */ public void release() { capture.release(); t = null; } } /** * <code>Publisher</code> - publisher. */ public static final class Publisher extends Object { /** * <code>NetConnectionListener</code> - {@link NetConnection} listener implementation. */ private final class NetConnectionListener extends NetConnection.ListenerAdapter { /** * Constructor. */ public NetConnectionListener() { } @Override public void onAsyncError(final INetConnection source, final String message, final Exception e) { System.out.println("Publisher#NetConnection#onAsyncError: " + message + " " + e); } @Override public void onIOError(final INetConnection source, final String message) { System.out.println("Publisher#NetConnection#onIOError: " + message); } @Override public void onNetStatus(final INetConnection source, final Map<String, Object> info) { System.out.println("Publisher#NetConnection#onNetStatus: " + info); final Object code = info.get("code"); if (NetConnection.CONNECT_SUCCESS.equals(code)) { } else { disconnected = true; } } } // fields private volatile boolean disconnected = false; /** * Publishes the stream. * * @param url * @param streamName * @param microphone microphone * @param camera camera */ public void publish(final String url, final String streamName, final IMicrophone microphone, final ICamera camera) { final NetConnection connection = new NetConnection(); connection.configuration().put(NetConnection.Configuration.INACTIVITY_TIMEOUT, -1); connection.configuration().put(NetConnection.Configuration.RECEIVE_BUFFER_SIZE, 256 * 1024); connection.configuration().put(NetConnection.Configuration.SEND_BUFFER_SIZE, 256 * 1024); connection.addEventListener(new NetConnectionListener()); connection.connect(url); // wait till connected while (!connection.connected() && !disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } if (!disconnected) { final NetStream stream = new NetStream(connection); stream.addEventListener(new NetStream.ListenerAdapter() { @Override public void onNetStatus(final INetStream source, final Map<String, Object> info) { System.out.println("Publisher#NetStream#onNetStatus: " + info); final Object code = info.get("code"); if (NetStream.PUBLISH_START.equals(code)) { if (microphone != null) { stream.attachAudio(microphone); } if (camera != null) { stream.attachCamera(camera, -1 /*snapshotMilliseconds*/); } } } }); stream.publish(streamName, NetStream.LIVE); } while (!disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } connection.close(); } } }